| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyList; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| import java.security.AccessControlException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CyclicBarrier; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Sets; |
| import org.apache.hadoop.yarn.MockApps; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeAttribute; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeKey; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeType; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeLabel; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueConfigurations; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.ReservationDefinition; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.ReservationRequest; |
| import org.apache.hadoop.yarn.api.records.ReservationRequests; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceInformation; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; |
| import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.UTCClock; |
| import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestClientRMService { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestClientRMService.class); |
| |
| private RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| private String appType = "MockApp"; |
| |
| private final static String QUEUE_1 = "Q-1"; |
| private final static String QUEUE_2 = "Q-2"; |
| private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo"; |
| private File resourceTypesFile = null; |
| |
| private Configuration conf; |
| private ResourceManager resourceManager; |
| private YarnRPC rpc; |
| private ApplicationClientProtocol client; |
| |
| @Test |
| public void testGetDecommissioningClusterNodes() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, |
| this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| int nodeMemory = 1024; |
| MockNM nm1 = rm.registerNode("host1:1234", nodeMemory); |
| rm.sendNodeStarted(nm1); |
| nm1.nodeHeartbeat(true); |
| rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); |
| Integer decommissioningTimeout = 600; |
| rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout); |
| rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); |
| |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Make call |
| List<NodeReport> nodeReports = client.getClusterNodes( |
| GetClusterNodesRequest.newInstance( |
| EnumSet.of(NodeState.DECOMMISSIONING))) |
| .getNodeReports(); |
| Assert.assertEquals(1, nodeReports.size()); |
| NodeReport nr = nodeReports.iterator().next(); |
| Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); |
| Assert.assertNull(nr.getNodeUpdateType()); |
| } |
| |
| @Test |
| public void testGetClusterNodes() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); |
| labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); |
| |
| // Add a healthy node with label = x |
| MockNM node = rm.registerNode("host1:1234", 1024); |
| Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); |
| map.put(node.getNodeId(), ImmutableSet.of("x")); |
| labelsMgr.replaceLabelsOnNode(map); |
| rm.sendNodeStarted(node); |
| node.nodeHeartbeat(true); |
| |
| // Add and lose a node with label = y |
| MockNM lostNode = rm.registerNode("host2:1235", 1024); |
| rm.sendNodeStarted(lostNode); |
| lostNode.nodeHeartbeat(true); |
| rm.waitForState(lostNode.getNodeId(), NodeState.RUNNING); |
| rm.sendNodeLost(lostNode); |
| |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Make call |
| GetClusterNodesRequest request = |
| GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING)); |
| List<NodeReport> nodeReports = |
| client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(1, nodeReports.size()); |
| Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY, |
| nodeReports.get(0).getNodeState()); |
| |
| // Check node's label = x |
| Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); |
| Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); |
| Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); |
| |
| // Now make the node unhealthy. |
| node.nodeHeartbeat(false); |
| rm.waitForState(node.getNodeId(), NodeState.UNHEALTHY); |
| |
| // Call again |
| nodeReports = client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals("Unhealthy nodes should not show up by default", 0, |
| nodeReports.size()); |
| |
| // Change label of host1 to y |
| map = new HashMap<NodeId, Set<String>>(); |
| map.put(node.getNodeId(), ImmutableSet.of("y")); |
| labelsMgr.replaceLabelsOnNode(map); |
| |
| // Now query for UNHEALTHY nodes |
| request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY)); |
| nodeReports = client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(1, nodeReports.size()); |
| Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, |
| nodeReports.get(0).getNodeState()); |
| |
| Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); |
| Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); |
| Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); |
| |
| // Remove labels of host1 |
| map = new HashMap<NodeId, Set<String>>(); |
| map.put(node.getNodeId(), ImmutableSet.of("y")); |
| labelsMgr.removeLabelsFromNode(map); |
| |
| // Query all states should return all nodes |
| rm.registerNode("host3:1236", 1024); |
| request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); |
| nodeReports = client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(3, nodeReports.size()); |
| |
| // All host1-3's label should be empty (instead of null) |
| for (NodeReport report : nodeReports) { |
| Assert.assertTrue(report.getNodeLabels() != null |
| && report.getNodeLabels().isEmpty()); |
| Assert.assertNull(report.getDecommissioningTimeout()); |
| Assert.assertNull(report.getNodeUpdateType()); |
| } |
| } |
| |
| @Test |
| public void testNonExistingApplicationReport() throws YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn( |
| new ConcurrentHashMap<ApplicationId, RMApp>()); |
| ClientRMService rmService = new ClientRMService(rmContext, null, null, |
| null, null, null); |
| GetApplicationReportRequest request = recordFactory |
| .newRecordInstance(GetApplicationReportRequest.class); |
| request.setApplicationId(ApplicationId.newInstance(0, 0)); |
| try { |
| rmService.getApplicationReport(request); |
| Assert.fail(); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.assertEquals(ex.getMessage(), |
| "Application with id '" + request.getApplicationId() |
| + "' doesn't exist in RM. Please check that the " |
| + "job submission was successful."); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationReport() throws Exception { |
| ResourceScheduler scheduler = mock(ResourceScheduler.class); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| |
| ApplicationId appId1 = getApplicationId(1); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| when( |
| mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); |
| |
| ClientRMService rmService = new ClientRMService(rmContext, scheduler, |
| null, mockAclsManager, null, null); |
| try { |
| GetApplicationReportRequest request = recordFactory |
| .newRecordInstance(GetApplicationReportRequest.class); |
| request.setApplicationId(appId1); |
| GetApplicationReportResponse response = |
| rmService.getApplicationReport(request); |
| ApplicationReport report = response.getApplicationReport(); |
| ApplicationResourceUsageReport usageReport = |
| report.getApplicationResourceUsageReport(); |
| Assert.assertEquals(10, usageReport.getMemorySeconds()); |
| Assert.assertEquals(3, usageReport.getVcoreSeconds()); |
| Assert.assertEquals("<Not set>", report.getAmNodeLabelExpression()); |
| Assert.assertEquals("<Not set>", report.getAppNodeLabelExpression()); |
| |
| // if application has am node label set to blank |
| ApplicationId appId2 = getApplicationId(2); |
| when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId2)).thenReturn(true); |
| request.setApplicationId(appId2); |
| response = rmService.getApplicationReport(request); |
| report = response.getApplicationReport(); |
| |
| Assert.assertEquals(NodeLabel.DEFAULT_NODE_LABEL_PARTITION, |
| report.getAmNodeLabelExpression()); |
| Assert.assertEquals(NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET, |
| report.getAppNodeLabelExpression()); |
| |
| // if application has am node label set to blank |
| ApplicationId appId3 = getApplicationId(3); |
| when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId3)).thenReturn(true); |
| |
| request.setApplicationId(appId3); |
| response = rmService.getApplicationReport(request); |
| report = response.getApplicationReport(); |
| |
| Assert.assertEquals("high-mem", report.getAmNodeLabelExpression()); |
| Assert.assertEquals("high-mem", report.getAppNodeLabelExpression()); |
| |
| // if application id is null |
| GetApplicationReportRequest invalidRequest = recordFactory |
| .newRecordInstance(GetApplicationReportRequest.class); |
| invalidRequest.setApplicationId(null); |
| try { |
| rmService.getApplicationReport(invalidRequest); |
| } catch (YarnException e) { |
| // rmService should return a ApplicationNotFoundException |
| // when a null application id is provided |
| Assert.assertTrue(e instanceof ApplicationNotFoundException); |
| } |
| } finally { |
| rmService.close(); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationAttemptReport() throws YarnException, |
| IOException { |
| ClientRMService rmService = createRMService(); |
| GetApplicationAttemptReportRequest request = recordFactory |
| .newRecordInstance(GetApplicationAttemptReportRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| request.setApplicationAttemptId(attemptId); |
| |
| try { |
| GetApplicationAttemptReportResponse response = rmService |
| .getApplicationAttemptReport(request); |
| Assert.assertEquals(attemptId, response.getApplicationAttemptReport() |
| .getApplicationAttemptId()); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationResourceUsageReportDummy() throws YarnException, |
| IOException { |
| ApplicationAttemptId attemptId = getApplicationAttemptId(1); |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) { |
| } |
| }); |
| ApplicationSubmissionContext asContext = |
| mock(ApplicationSubmissionContext.class); |
| YarnConfiguration config = new YarnConfiguration(); |
| RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, |
| rmContext, scheduler, null, asContext, config, null, null); |
| ApplicationResourceUsageReport report = rmAppAttemptImpl |
| .getApplicationResourceUsageReport(); |
| assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); |
| } |
| |
| @Test |
| public void testGetApplicationAttempts() throws YarnException, IOException { |
| ClientRMService rmService = createRMService(); |
| GetApplicationAttemptsRequest request = recordFactory |
| .newRecordInstance(GetApplicationAttemptsRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| request.setApplicationId(ApplicationId.newInstance(123456, 1)); |
| |
| try { |
| GetApplicationAttemptsResponse response = rmService |
| .getApplicationAttempts(request); |
| Assert.assertEquals(1, response.getApplicationAttemptList().size()); |
| Assert.assertEquals(attemptId, response.getApplicationAttemptList() |
| .get(0).getApplicationAttemptId()); |
| |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testGetContainerReport() throws YarnException, IOException { |
| ClientRMService rmService = createRMService(); |
| GetContainerReportRequest request = recordFactory |
| .newRecordInstance(GetContainerReportRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| ContainerId containerId = ContainerId.newContainerId(attemptId, 1); |
| request.setContainerId(containerId); |
| |
| try { |
| GetContainerReportResponse response = rmService |
| .getContainerReport(request); |
| Assert.assertEquals(containerId, response.getContainerReport() |
| .getContainerId()); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testGetContainers() throws YarnException, IOException { |
| ClientRMService rmService = createRMService(); |
| GetContainersRequest request = recordFactory |
| .newRecordInstance(GetContainersRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| ContainerId containerId = ContainerId.newContainerId(attemptId, 1); |
| request.setApplicationAttemptId(attemptId); |
| try { |
| GetContainersResponse response = rmService.getContainers(request); |
| Assert.assertEquals(containerId, response.getContainerList().get(0) |
| .getContainerId()); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| public ClientRMService createRMService() throws IOException, YarnException { |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, |
| scheduler); |
| when(rmContext.getRMApps()).thenReturn(apps); |
| when(rmContext.getYarnConfiguration()).thenReturn(new Configuration()); |
| RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, |
| mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) { |
| } |
| }); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when( |
| mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(), |
| any())).thenReturn(true); |
| return new ClientRMService(rmContext, scheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| } |
| |
| @Test |
| public void testForceKillNonExistingApplication() throws YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn( |
| new ConcurrentHashMap<ApplicationId, RMApp>()); |
| ClientRMService rmService = new ClientRMService(rmContext, null, null, |
| null, null, null); |
| ApplicationId applicationId = |
| BuilderUtils.newApplicationId(System.currentTimeMillis(), 0); |
| KillApplicationRequest request = |
| KillApplicationRequest.newInstance(applicationId); |
| try { |
| rmService.forceKillApplication(request); |
| Assert.fail(); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.assertEquals(ex.getMessage(), |
| "Trying to kill an absent " + |
| "application " + request.getApplicationId()); |
| } |
| } |
| |
| @Test |
| public void testApplicationTagsValidation() throws IOException { |
| conf = new YarnConfiguration(); |
| int maxtags = 3, appMaxTagLength = 5; |
| conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAGS, maxtags); |
| conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH, |
| appMaxTagLength); |
| MockRM rm = new MockRM(conf); |
| resourceManager = rm; |
| rm.init(conf); |
| rm.start(); |
| |
| ClientRMService rmService = rm.getClientRMService(); |
| |
| List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3", "Tag4"); |
| validateApplicationTag(rmService, tags, |
| "Too many applicationTags, a maximum of only " + maxtags |
| + " are allowed!"); |
| |
| tags = Arrays.asList("ApplicationTag1", "ApplicationTag2", |
| "ApplicationTag3"); |
| // tags are converted to lowercase in |
| // ApplicationSubmissionContext#setApplicationTags |
| validateApplicationTag(rmService, tags, |
| "Tag applicationtag1 is too long, maximum allowed length of a tag is " |
| + appMaxTagLength); |
| |
| tags = Arrays.asList("tãg1", "tag2#"); |
| validateApplicationTag(rmService, tags, |
| "A tag can only have ASCII characters! Invalid tag - tãg1"); |
| } |
| |
| private void validateApplicationTag(ClientRMService rmService, |
| List<String> tags, String errorMsg) { |
| SubmitApplicationRequest submitRequest = mockSubmitAppRequest( |
| getApplicationId(101), MockApps.newAppName(), QUEUE_1, |
| new HashSet<String>(tags)); |
| try { |
| rmService.submitApplication(submitRequest); |
| Assert.fail(); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getMessage().contains(errorMsg)); |
| } |
| } |
| |
| @Test |
| public void testForceKillApplication() throws Exception { |
| conf = new YarnConfiguration(); |
| conf.setBoolean(MockRM.ENABLE_WEBAPP, true); |
| MockRM rm = new MockRM(conf); |
| resourceManager = rm; |
| rm.init(conf); |
| rm.start(); |
| |
| ClientRMService rmService = rm.getClientRMService(); |
| GetApplicationsRequest getRequest = GetApplicationsRequest.newInstance( |
| EnumSet.of(YarnApplicationState.KILLED)); |
| |
| RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm); |
| MockRMAppSubmissionData data = |
| MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) |
| .withUnmanagedAM(true) |
| .build(); |
| RMApp app2 = MockRMAppSubmitter.submit(rm, data); |
| |
| assertEquals("Incorrect number of apps in the RM", 0, |
| rmService.getApplications(getRequest).getApplicationList().size()); |
| |
| KillApplicationRequest killRequest1 = |
| KillApplicationRequest.newInstance(app1.getApplicationId()); |
| String diagnostic = "message1"; |
| killRequest1.setDiagnostics(diagnostic); |
| KillApplicationRequest killRequest2 = |
| KillApplicationRequest.newInstance(app2.getApplicationId()); |
| |
| int killAttemptCount = 0; |
| for (int i = 0; i < 100; i++) { |
| KillApplicationResponse killResponse1 = |
| rmService.forceKillApplication(killRequest1); |
| killAttemptCount++; |
| if (killResponse1.getIsKillCompleted()) { |
| break; |
| } |
| Thread.sleep(10); |
| } |
| assertTrue("Kill attempt count should be greater than 1 for managed AMs", |
| killAttemptCount > 1); |
| assertEquals("Incorrect number of apps in the RM", 1, |
| rmService.getApplications(getRequest).getApplicationList().size()); |
| assertTrue("Diagnostic message is incorrect", |
| app1.getDiagnostics().toString().contains(diagnostic)); |
| |
| KillApplicationResponse killResponse2 = |
| rmService.forceKillApplication(killRequest2); |
| assertTrue("Killing UnmanagedAM should falsely acknowledge true", |
| killResponse2.getIsKillCompleted()); |
| for (int i = 0; i < 100; i++) { |
| if (2 == |
| rmService.getApplications(getRequest).getApplicationList().size()) { |
| break; |
| } |
| Thread.sleep(10); |
| } |
| assertEquals("Incorrect number of apps in the RM", 2, |
| rmService.getApplications(getRequest).getApplicationList().size()); |
| } |
| |
| @Test (expected = ApplicationNotFoundException.class) |
| public void testMoveAbsentApplication() throws YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn( |
| new ConcurrentHashMap<ApplicationId, RMApp>()); |
| ClientRMService rmService = new ClientRMService(rmContext, null, null, |
| null, null, null); |
| ApplicationId applicationId = |
| BuilderUtils.newApplicationId(System.currentTimeMillis(), 0); |
| MoveApplicationAcrossQueuesRequest request = |
| MoveApplicationAcrossQueuesRequest.newInstance(applicationId, |
| "newqueue"); |
| rmService.moveApplicationAcrossQueues(request); |
| } |
| |
| @Test |
| public void testMoveApplicationSubmitTargetQueue() throws Exception { |
| // move the application as the owner |
| ApplicationId applicationId = getApplicationId(1); |
| UserGroupInformation aclUGI = UserGroupInformation.getCurrentUser(); |
| QueueACLsManager queueACLsManager = getQueueAclManager("allowed_queue", |
| QueueACL.SUBMIT_APPLICATIONS, aclUGI); |
| ApplicationACLsManager appAclsManager = getAppAclManager(); |
| |
| ClientRMService rmService = createClientRMServiceForMoveApplicationRequest( |
| applicationId, aclUGI.getShortUserName(), appAclsManager, |
| queueACLsManager); |
| |
| // move as the owner queue in the acl |
| MoveApplicationAcrossQueuesRequest moveAppRequest = |
| MoveApplicationAcrossQueuesRequest. |
| newInstance(applicationId, "allowed_queue"); |
| rmService.moveApplicationAcrossQueues(moveAppRequest); |
| |
| // move as the owner queue not in the acl |
| moveAppRequest = MoveApplicationAcrossQueuesRequest.newInstance( |
| applicationId, "not_allowed"); |
| |
| try { |
| rmService.moveApplicationAcrossQueues(moveAppRequest); |
| Assert.fail("The request should fail with an AccessControlException"); |
| } catch (YarnException rex) { |
| Assert.assertTrue("AccessControlException is expected", |
| rex.getCause() instanceof AccessControlException); |
| } |
| |
| // ACL is owned by "moveuser", move is performed as a different user |
| aclUGI = UserGroupInformation.createUserForTesting("moveuser", |
| new String[]{}); |
| queueACLsManager = getQueueAclManager("move_queue", |
| QueueACL.SUBMIT_APPLICATIONS, aclUGI); |
| appAclsManager = getAppAclManager(); |
| ClientRMService rmService2 = |
| createClientRMServiceForMoveApplicationRequest(applicationId, |
| aclUGI.getShortUserName(), appAclsManager, queueACLsManager); |
| |
| // access to the queue not OK: user not allowed in this queue |
| MoveApplicationAcrossQueuesRequest moveAppRequest2 = |
| MoveApplicationAcrossQueuesRequest. |
| newInstance(applicationId, "move_queue"); |
| try { |
| rmService2.moveApplicationAcrossQueues(moveAppRequest2); |
| Assert.fail("The request should fail with an AccessControlException"); |
| } catch (YarnException rex) { |
| Assert.assertTrue("AccessControlException is expected", |
| rex.getCause() instanceof AccessControlException); |
| } |
| |
| // execute the move as the acl owner |
| // access to the queue OK: user allowed in this queue |
| aclUGI.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() throws Exception { |
| return rmService2.moveApplicationAcrossQueues(moveAppRequest2); |
| } |
| }); |
| } |
| |
| @Test |
| public void testMoveApplicationAdminTargetQueue() throws Exception { |
| ApplicationId applicationId = getApplicationId(1); |
| UserGroupInformation aclUGI = UserGroupInformation.getCurrentUser(); |
| QueueACLsManager queueAclsManager = getQueueAclManager("allowed_queue", |
| QueueACL.ADMINISTER_QUEUE, aclUGI); |
| ApplicationACLsManager appAclsManager = getAppAclManager(); |
| ClientRMService rmService = |
| createClientRMServiceForMoveApplicationRequest(applicationId, |
| aclUGI.getShortUserName(), appAclsManager, queueAclsManager); |
| |
| // user is admin move to queue in acl |
| MoveApplicationAcrossQueuesRequest moveAppRequest = |
| MoveApplicationAcrossQueuesRequest.newInstance(applicationId, |
| "allowed_queue"); |
| rmService.moveApplicationAcrossQueues(moveAppRequest); |
| |
| // user is admin move to queue not in acl |
| moveAppRequest = MoveApplicationAcrossQueuesRequest.newInstance( |
| applicationId, "not_allowed"); |
| |
| try { |
| rmService.moveApplicationAcrossQueues(moveAppRequest); |
| Assert.fail("The request should fail with an AccessControlException"); |
| } catch (YarnException rex) { |
| Assert.assertTrue("AccessControlException is expected", |
| rex.getCause() instanceof AccessControlException); |
| } |
| |
| // ACL is owned by "moveuser", move is performed as a different user |
| aclUGI = UserGroupInformation.createUserForTesting("moveuser", |
| new String[]{}); |
| queueAclsManager = getQueueAclManager("move_queue", |
| QueueACL.ADMINISTER_QUEUE, aclUGI); |
| appAclsManager = getAppAclManager(); |
| ClientRMService rmService2 = |
| createClientRMServiceForMoveApplicationRequest(applicationId, |
| aclUGI.getShortUserName(), appAclsManager, queueAclsManager); |
| |
| // no access to this queue |
| MoveApplicationAcrossQueuesRequest moveAppRequest2 = |
| MoveApplicationAcrossQueuesRequest. |
| newInstance(applicationId, "move_queue"); |
| |
| try { |
| rmService2.moveApplicationAcrossQueues(moveAppRequest2); |
| Assert.fail("The request should fail with an AccessControlException"); |
| } catch (YarnException rex) { |
| Assert.assertTrue("AccessControlException is expected", |
| rex.getCause() instanceof AccessControlException); |
| } |
| |
| // execute the move as the acl owner |
| // access to the queue OK: user allowed in this queue |
| aclUGI.doAs(new PrivilegedExceptionAction<Object>() { |
| @Override |
| public Object run() throws Exception { |
| return rmService2.moveApplicationAcrossQueues(moveAppRequest2); |
| } |
| }); |
| } |
| |
| @Test (expected = YarnException.class) |
| public void testNonExistingQueue() throws Exception { |
| ApplicationId applicationId = getApplicationId(1); |
| UserGroupInformation aclUGI = UserGroupInformation.getCurrentUser(); |
| QueueACLsManager queueAclsManager = getQueueAclManager(); |
| ApplicationACLsManager appAclsManager = getAppAclManager(); |
| ClientRMService rmService = |
| createClientRMServiceForMoveApplicationRequest(applicationId, |
| aclUGI.getShortUserName(), appAclsManager, queueAclsManager); |
| |
| MoveApplicationAcrossQueuesRequest moveAppRequest = |
| MoveApplicationAcrossQueuesRequest.newInstance(applicationId, |
| "unknown_queue"); |
| rmService.moveApplicationAcrossQueues(moveAppRequest); |
| } |
| |
| /** |
| * Create an instance of ClientRMService for testing |
| * moveApplicationAcrossQueues requests. |
| * @param applicationId the application |
| * @return ClientRMService |
| */ |
| private ClientRMService createClientRMServiceForMoveApplicationRequest( |
| ApplicationId applicationId, String appOwner, |
| ApplicationACLsManager appAclsManager, |
| QueueACLsManager queueAclsManager) { |
| RMApp app = mock(RMApp.class); |
| when(app.getUser()).thenReturn(appOwner); |
| when(app.getState()).thenReturn(RMAppState.RUNNING); |
| when(app.getApplicationId()).thenReturn(applicationId); |
| ConcurrentHashMap<ApplicationId, RMApp> apps = new ConcurrentHashMap<>(); |
| apps.put(applicationId, app); |
| |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn(apps); |
| |
| RMAppManager rmAppManager = mock(RMAppManager.class); |
| return new ClientRMService(rmContext, null, rmAppManager, appAclsManager, |
| queueAclsManager, null); |
| } |
| |
| /** |
| * Plain application acl manager that always returns true. |
| * @return ApplicationACLsManager |
| */ |
| private ApplicationACLsManager getAppAclManager() { |
| ApplicationACLsManager aclsManager = mock(ApplicationACLsManager.class); |
| when(aclsManager.checkAccess( |
| any(UserGroupInformation.class), |
| any(ApplicationAccessType.class), |
| any(String.class), |
| any(ApplicationId.class))).thenReturn(true); |
| return aclsManager; |
| } |
| |
| /** |
| * Generate the Queue acl. |
| * @param allowedQueue the queue to allow the move to |
| * @param queueACL the acl to check: submit app or queue admin |
| * @param aclUser the user to check |
| * @return QueueACLsManager |
| */ |
| private QueueACLsManager getQueueAclManager(String allowedQueue, |
| QueueACL queueACL, UserGroupInformation aclUser) throws IOException { |
| // ACL that checks the queue is allowed |
| QueueACLsManager queueACLsManager = mock(QueueACLsManager.class); |
| when(queueACLsManager.checkAccess( |
| any(UserGroupInformation.class), |
| any(QueueACL.class), |
| any(RMApp.class), |
| any(), |
| any())).thenAnswer(new Answer<Boolean>() { |
| @Override |
| public Boolean answer(InvocationOnMock invocationOnMock) { |
| final UserGroupInformation user = |
| (UserGroupInformation) invocationOnMock.getArguments()[0]; |
| final QueueACL acl = |
| (QueueACL) invocationOnMock.getArguments()[1]; |
| return (queueACL.equals(acl) && |
| aclUser.getShortUserName().equals(user.getShortUserName())); |
| } |
| }); |
| |
| when(queueACLsManager.checkAccess( |
| any(UserGroupInformation.class), |
| any(QueueACL.class), |
| any(RMApp.class), |
| any(), |
| any(), |
| any(String.class))).thenAnswer(new Answer<Boolean>() { |
| @Override |
| public Boolean answer(InvocationOnMock invocationOnMock) { |
| final UserGroupInformation user = |
| (UserGroupInformation) invocationOnMock.getArguments()[0]; |
| final QueueACL acl = (QueueACL) invocationOnMock.getArguments()[1]; |
| final String queue = (String) invocationOnMock.getArguments()[5]; |
| return (allowedQueue.equals(queue) && queueACL.equals(acl) && |
| aclUser.getShortUserName().equals(user.getShortUserName())); |
| } |
| }); |
| return queueACLsManager; |
| } |
| |
| /** |
| * QueueACLsManager that always returns false when a target queue is passed |
| * in and true for other checks to simulate a missing queue. |
| * @return QueueACLsManager |
| */ |
| private QueueACLsManager getQueueAclManager() { |
| QueueACLsManager queueACLsManager = mock(QueueACLsManager.class); |
| when(queueACLsManager.checkAccess( |
| any(UserGroupInformation.class), |
| any(QueueACL.class), |
| any(RMApp.class), |
| any(String.class), |
| anyList(), |
| any(String.class))).thenReturn(false); |
| when(queueACLsManager.checkAccess( |
| any(UserGroupInformation.class), |
| any(QueueACL.class), |
| any(RMApp.class), |
| any(String.class), |
| anyList())).thenReturn(true); |
| return queueACLsManager; |
| } |
| |
| @Test |
| public void testGetQueueInfo() throws Exception { |
| ResourceScheduler scheduler = mock(ResourceScheduler.class); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(String.class), |
| any())) |
| .thenReturn(true); |
| when(mockAclsManager.checkAccess(any(UserGroupInformation.class), |
| any(ApplicationAccessType.class), any(), |
| any(ApplicationId.class))).thenReturn(true); |
| |
| ClientRMService rmService = new ClientRMService(rmContext, scheduler, |
| null, mockAclsManager, mockQueueACLsManager, null); |
| GetQueueInfoRequest request = recordFactory |
| .newRecordInstance(GetQueueInfoRequest.class); |
| request.setQueueName("testqueue"); |
| request.setIncludeApplications(true); |
| GetQueueInfoResponse queueInfo = rmService.getQueueInfo(request); |
| List<ApplicationReport> applications = queueInfo.getQueueInfo() |
| .getApplications(); |
| Assert.assertEquals(2, applications.size()); |
| Map<String, QueueConfigurations> queueConfigsByPartition = |
| queueInfo.getQueueInfo().getQueueConfigurations(); |
| Assert.assertEquals(1, queueConfigsByPartition.size()); |
| Assert.assertTrue(queueConfigsByPartition.containsKey("*")); |
| QueueConfigurations queueConfigs = queueConfigsByPartition.get("*"); |
| Assert.assertEquals(0.5f, queueConfigs.getCapacity(), 0.0001f); |
| Assert.assertEquals(0.1f, queueConfigs.getAbsoluteCapacity(), 0.0001f); |
| Assert.assertEquals(1.0f, queueConfigs.getMaxCapacity(), 0.0001f); |
| Assert.assertEquals(1.0f, queueConfigs.getAbsoluteMaxCapacity(), 0.0001f); |
| Assert.assertEquals(0.2f, queueConfigs.getMaxAMPercentage(), 0.0001f); |
| |
| request.setQueueName("nonexistentqueue"); |
| request.setIncludeApplications(true); |
| // should not throw exception on nonexistent queue |
| queueInfo = rmService.getQueueInfo(request); |
| |
| // Case where user does not have application access |
| ApplicationACLsManager mockAclsManager1 = |
| mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager1 = |
| mock(QueueACLsManager.class); |
| when(mockQueueACLsManager1.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(String.class), |
| any())) |
| .thenReturn(false); |
| when(mockAclsManager1.checkAccess(any(UserGroupInformation.class), |
| any(ApplicationAccessType.class), anyString(), |
| any(ApplicationId.class))).thenReturn(false); |
| |
| ClientRMService rmService1 = new ClientRMService(rmContext, scheduler, |
| null, mockAclsManager1, mockQueueACLsManager1, null); |
| request.setQueueName("testqueue"); |
| request.setIncludeApplications(true); |
| GetQueueInfoResponse queueInfo1 = rmService1.getQueueInfo(request); |
| List<ApplicationReport> applications1 = queueInfo1.getQueueInfo() |
| .getApplications(); |
| Assert.assertEquals(0, applications1.size()); |
| } |
| |
| @Test (timeout = 30000) |
| @SuppressWarnings ("rawtypes") |
| public void testAppSubmitWithSubmissionPreProcessor() throws Exception { |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| YarnConfiguration yConf = new YarnConfiguration(); |
| yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, |
| true); |
| yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); |
| // Override the YARN configuration. |
| when(rmContext.getYarnConfiguration()).thenReturn(yConf); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| RMAppManager appManager = new RMAppManager(rmContext, scheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) {} |
| }); |
| ApplicationId appId1 = getApplicationId(100); |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| when( |
| mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); |
| |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(String.class), |
| any())) |
| .thenReturn(true); |
| |
| ClientRMService rmService = |
| new ClientRMService(rmContext, scheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| File rulesFile = File.createTempFile("submission_rules", ".tmp"); |
| rulesFile.deleteOnExit(); |
| rulesFile.createNewFile(); |
| |
| yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH, |
| rulesFile.getAbsolutePath()); |
| rmService.serviceInit(yConf); |
| rmService.serviceStart(); |
| |
| BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile)); |
| writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1"); |
| writer.newLine(); |
| writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2"); |
| writer.newLine(); |
| writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); |
| writer.newLine(); |
| writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg"); |
| writer.newLine(); |
| writer.write("* TA=cluster:other Q=default NL=barfoo"); |
| writer.newLine(); |
| writer.write("host.testcluster1.com Q=default"); |
| writer.flush(); |
| writer.close(); |
| rmService.getContextPreProcessor().refresh(); |
| setupCurrentCall("host.cluster1.com"); |
| SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( |
| appId1, null, null); |
| try { |
| rmService.submitApplication(submitRequest1); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app1 = rmContext.getRMApps().get(appId1); |
| Assert.assertNotNull("app doesn't exist", app1); |
| Assert.assertEquals("app name doesn't match", |
| YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); |
| Assert.assertTrue("custom tag not present", |
| app1.getApplicationTags().contains("cluster:cluster1")); |
| Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue()); |
| Assert.assertEquals("app node label doesn't match", |
| "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression()); |
| setupCurrentCall("host.cluster2.com"); |
| ApplicationId appId2 = getApplicationId(101); |
| SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( |
| appId2, null, null); |
| submitRequest2.getApplicationSubmissionContext().setApplicationType( |
| "matchType"); |
| Set<String> aTags = new HashSet<String>(); |
| aTags.add(APPLICATION_TAG_SC_PREPROCESSOR); |
| submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags); |
| try { |
| rmService.submitApplication(submitRequest2); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app2 = rmContext.getRMApps().get(appId2); |
| Assert.assertNotNull("app doesn't exist", app2); |
| Assert.assertEquals("app name doesn't match", |
| YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName()); |
| Assert.assertTrue("client tag not present", |
| app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR)); |
| Assert.assertTrue("custom tag not present", |
| app2.getApplicationTags().contains("cluster:cluster2")); |
| Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue()); |
| Assert.assertEquals("app node label doesn't match", |
| "zuess", |
| app2.getApplicationSubmissionContext().getNodeLabelExpression()); |
| // Test Default commands |
| setupCurrentCall("host2.cluster3.com"); |
| ApplicationId appId3 = getApplicationId(102); |
| SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest( |
| appId3, null, null); |
| submitRequest3.getApplicationSubmissionContext().setApplicationType( |
| "matchType"); |
| submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags); |
| try { |
| rmService.submitApplication(submitRequest3); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app3 = rmContext.getRMApps().get(appId3); |
| Assert.assertNotNull("app doesn't exist", app3); |
| Assert.assertEquals("app name doesn't match", |
| YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName()); |
| Assert.assertTrue("client tag not present", |
| app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR)); |
| Assert.assertTrue("custom tag not present", |
| app3.getApplicationTags().contains("cluster:other")); |
| Assert.assertEquals("app queue doesn't match", "default", app3.getQueue()); |
| Assert.assertEquals("app node label doesn't match", |
| "barfoo", |
| app3.getApplicationSubmissionContext().getNodeLabelExpression()); |
| // Test regex |
| setupCurrentCall("host.cluster100.com"); |
| ApplicationId appId4 = getApplicationId(103); |
| SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest( |
| appId4, null, null); |
| try { |
| rmService.submitApplication(submitRequest4); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app4 = rmContext.getRMApps().get(appId4); |
| Assert.assertTrue("custom tag not present", |
| app4.getApplicationTags().contains("cluster:reg")); |
| Assert.assertEquals("app node label doesn't match", |
| "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression()); |
| testSubmissionContextWithAbsentTAG(rmService, rmContext); |
| rmService.serviceStop(); |
| } |
| |
| private void testSubmissionContextWithAbsentTAG(ClientRMService rmService, |
| RMContext rmContext) throws Exception { |
| setupCurrentCall("host.testcluster1.com"); |
| ApplicationId appId5 = getApplicationId(104); |
| SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest( |
| appId5, null, null); |
| try { |
| rmService.submitApplication(submitRequest5); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app5 = rmContext.getRMApps().get(appId5); |
| Assert.assertEquals("custom tag present", |
| app5.getApplicationTags().size(), 0); |
| Assert.assertNull("app node label present", |
| app5.getApplicationSubmissionContext().getNodeLabelExpression()); |
| Assert.assertEquals("Queue name is not present", |
| app5.getQueue(), "default"); |
| } |
| private void setupCurrentCall(String hostName) throws UnknownHostException { |
| Server.Call mockCall = mock(Server.Call.class); |
| when(mockCall.getHostInetAddress()).thenReturn( |
| InetAddress.getByAddress(hostName, |
| new byte[]{123, 123, 123, 123})); |
| Server.getCurCall().set(mockCall); |
| } |
| |
| @Test (timeout = 30000) |
| @SuppressWarnings ("rawtypes") |
| public void testAppSubmit() throws Exception { |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| RMAppManager appManager = new RMAppManager(rmContext, scheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) {} |
| }); |
| doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) |
| .getRMTimelineCollectorManager(); |
| |
| ApplicationId appId1 = getApplicationId(100); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| when( |
| mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); |
| |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(String.class), |
| any())) |
| .thenReturn(true); |
| ClientRMService rmService = |
| new ClientRMService(rmContext, scheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| rmService.init(new Configuration()); |
| |
| // without name and queue |
| |
| SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( |
| appId1, null, null); |
| try { |
| rmService.submitApplication(submitRequest1); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app1 = rmContext.getRMApps().get(appId1); |
| Assert.assertNotNull("app doesn't exist", app1); |
| Assert.assertEquals("app name doesn't match", |
| YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); |
| Assert.assertEquals("app queue doesn't match", |
| YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue()); |
| |
| // with name and queue |
| String name = MockApps.newAppName(); |
| String queue = MockApps.newQueue(); |
| ApplicationId appId2 = getApplicationId(101); |
| SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( |
| appId2, name, queue); |
| submitRequest2.getApplicationSubmissionContext().setApplicationType( |
| "matchType"); |
| try { |
| rmService.submitApplication(submitRequest2); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app2 = rmContext.getRMApps().get(appId2); |
| Assert.assertNotNull("app doesn't exist", app2); |
| Assert.assertEquals("app name doesn't match", name, app2.getName()); |
| Assert.assertEquals("app queue doesn't match", queue, app2.getQueue()); |
| |
| // duplicate appId |
| try { |
| rmService.submitApplication(submitRequest2); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| |
| GetApplicationsRequest getAllAppsRequest = |
| GetApplicationsRequest.newInstance(new HashSet<String>()); |
| GetApplicationsResponse getAllApplicationsResponse = |
| rmService.getApplications(getAllAppsRequest); |
| Assert.assertEquals(5, |
| getAllApplicationsResponse.getApplicationList().size()); |
| |
| Set<String> appTypes = new HashSet<String>(); |
| appTypes.add("matchType"); |
| |
| getAllAppsRequest = GetApplicationsRequest.newInstance(appTypes); |
| getAllApplicationsResponse = |
| rmService.getApplications(getAllAppsRequest); |
| Assert.assertEquals(1, |
| getAllApplicationsResponse.getApplicationList().size()); |
| Assert.assertEquals(appId2, |
| getAllApplicationsResponse.getApplicationList() |
| .get(0).getApplicationId()); |
| |
| // Test query with uppercase appType also works |
| appTypes = new HashSet<String>(); |
| appTypes.add("MATCHTYPE"); |
| getAllAppsRequest = GetApplicationsRequest.newInstance(appTypes); |
| getAllApplicationsResponse = |
| rmService.getApplications(getAllAppsRequest); |
| Assert.assertEquals(1, |
| getAllApplicationsResponse.getApplicationList().size()); |
| Assert.assertEquals(appId2, |
| getAllApplicationsResponse.getApplicationList() |
| .get(0).getApplicationId()); |
| } |
| |
| @Test |
| public void testGetApplications() throws Exception { |
| /** |
| * 1. Submit 3 applications alternately in two queues |
| * 2. Test each of the filters |
| */ |
| // Basic setup |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) |
| .getRMTimelineCollectorManager(); |
| |
| RMAppManager appManager = new RMAppManager(rmContext, scheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) {} |
| }); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(), |
| any())) |
| .thenReturn(true); |
| ClientRMService rmService = |
| new ClientRMService(rmContext, scheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| rmService.init(new Configuration()); |
| |
| // Initialize appnames and queues |
| String[] queues = {QUEUE_1, QUEUE_2}; |
| String[] appNames = |
| {MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()}; |
| ApplicationId[] appIds = |
| {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; |
| List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3"); |
| |
| long[] submitTimeMillis = new long[3]; |
| // Submit applications |
| for (int i = 0; i < appIds.length; i++) { |
| ApplicationId appId = appIds[i]; |
| when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true); |
| SubmitApplicationRequest submitRequest = mockSubmitAppRequest( |
| appId, appNames[i], queues[i % queues.length], |
| new HashSet<String>(tags.subList(0, i + 1))); |
| // make sure each app is submitted at a different time |
| Thread.sleep(1); |
| rmService.submitApplication(submitRequest); |
| submitTimeMillis[i] = rmService.getApplicationReport( |
| GetApplicationReportRequest.newInstance(appId)) |
| .getApplicationReport().getStartTime(); |
| } |
| |
| // Test different cases of ClientRMService#getApplications() |
| GetApplicationsRequest request = GetApplicationsRequest.newInstance(); |
| assertEquals("Incorrect total number of apps", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check limit |
| request.setLimit(1L); |
| assertEquals("Failed to limit applications", 1, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check start range |
| request = GetApplicationsRequest.newInstance(); |
| request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis()); |
| |
| // 2 applications are submitted after first timeMills |
| assertEquals("Incorrect number of matching start range", |
| 2, rmService.getApplications(request).getApplicationList().size()); |
| |
| // 1 application is submitted after the second timeMills |
| request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis()); |
| assertEquals("Incorrect number of matching start range", |
| 1, rmService.getApplications(request).getApplicationList().size()); |
| |
| // no application is submitted after the third timeMills |
| request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis()); |
| assertEquals("Incorrect number of matching start range", |
| 0, rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check queue |
| request = GetApplicationsRequest.newInstance(); |
| Set<String> queueSet = new HashSet<String>(); |
| request.setQueues(queueSet); |
| |
| queueSet.add(queues[0]); |
| assertEquals("Incorrect number of applications in queue", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| assertEquals("Incorrect number of applications in queue", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| queueSet.add(queues[1]); |
| assertEquals("Incorrect number of applications in queue", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check user |
| request = GetApplicationsRequest.newInstance(); |
| Set<String> userSet = new HashSet<String>(); |
| request.setUsers(userSet); |
| |
| userSet.add("random-user-name"); |
| assertEquals("Incorrect number of applications for user", 0, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| userSet.add(UserGroupInformation.getCurrentUser().getShortUserName()); |
| assertEquals("Incorrect number of applications for user", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| rmService.setDisplayPerUserApps(true); |
| userSet.clear(); |
| assertEquals("Incorrect number of applications for user", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| rmService.setDisplayPerUserApps(false); |
| |
| // Check tags |
| request = GetApplicationsRequest.newInstance( |
| ApplicationsRequestScope.ALL, null, null, null, null, null, null, |
| null, null); |
| Set<String> tagSet = new HashSet<String>(); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| tagSet = Sets.newHashSet(tags.get(0)); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| tagSet = Sets.newHashSet(tags.get(1)); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 2, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| tagSet = Sets.newHashSet(tags.get(2)); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 1, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check scope |
| request = GetApplicationsRequest.newInstance( |
| ApplicationsRequestScope.VIEWABLE); |
| assertEquals("Incorrect number of applications for the scope", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| request = GetApplicationsRequest.newInstance( |
| ApplicationsRequestScope.OWN); |
| assertEquals("Incorrect number of applications for the scope", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| } |
| |
| @Test(timeout=4000) |
| public void testConcurrentAppSubmit() |
| throws IOException, InterruptedException, BrokenBarrierException, |
| YarnException { |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| RMAppManager appManager = new RMAppManager(rmContext, scheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| |
| final ApplicationId appId1 = getApplicationId(100); |
| final ApplicationId appId2 = getApplicationId(101); |
| final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( |
| appId1, null, null); |
| final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( |
| appId2, null, null); |
| |
| final CyclicBarrier startBarrier = new CyclicBarrier(2); |
| final CyclicBarrier endBarrier = new CyclicBarrier(2); |
| |
| EventHandler<Event> eventHandler = new EventHandler<Event>() { |
| @Override |
| public void handle(Event rawEvent) { |
| if (rawEvent instanceof RMAppEvent) { |
| RMAppEvent event = (RMAppEvent) rawEvent; |
| if (event.getApplicationId().equals(appId1)) { |
| try { |
| startBarrier.await(); |
| endBarrier.await(); |
| } catch (BrokenBarrierException e) { |
| LOG.warn("Broken Barrier", e); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while awaiting barriers", e); |
| } |
| } |
| } |
| } |
| }; |
| |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler); |
| doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) |
| .getRMTimelineCollectorManager(); |
| |
| final ClientRMService rmService = |
| new ClientRMService(rmContext, scheduler, appManager, null, null, |
| null); |
| rmService.init(new Configuration()); |
| |
| // submit an app and wait for it to block while in app submission |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| rmService.submitApplication(submitRequest1); |
| } catch (YarnException | IOException e) {} |
| } |
| }; |
| t.start(); |
| |
| // submit another app, so go through while the first app is blocked |
| startBarrier.await(); |
| rmService.submitApplication(submitRequest2); |
| endBarrier.await(); |
| t.join(); |
| } |
| |
| private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, |
| String name, String queue) { |
| return mockSubmitAppRequest(appId, name, queue, null); |
| } |
| |
| private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, |
| String name, String queue, Set<String> tags) { |
| return mockSubmitAppRequest(appId, name, queue, tags, false); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, |
| String name, String queue, Set<String> tags, boolean unmanaged) { |
| |
| ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); |
| |
| Resource resource = Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); |
| |
| ApplicationSubmissionContext submissionContext = |
| recordFactory.newRecordInstance(ApplicationSubmissionContext.class); |
| submissionContext.setAMContainerSpec(amContainerSpec); |
| submissionContext.setApplicationName(name); |
| submissionContext.setQueue(queue); |
| submissionContext.setApplicationId(appId); |
| submissionContext.setResource(resource); |
| submissionContext.setApplicationType(appType); |
| submissionContext.setApplicationTags(tags); |
| submissionContext.setUnmanagedAM(unmanaged); |
| submissionContext.setPriority(Priority.newInstance(0)); |
| |
| SubmitApplicationRequest submitRequest = |
| recordFactory.newRecordInstance(SubmitApplicationRequest.class); |
| submitRequest.setApplicationSubmissionContext(submissionContext); |
| return submitRequest; |
| } |
| |
| private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) |
| throws IOException { |
| Dispatcher dispatcher = mock(Dispatcher.class); |
| when(rmContext.getDispatcher()).thenReturn(dispatcher); |
| @SuppressWarnings("unchecked") |
| EventHandler<Event> eventHandler = mock(EventHandler.class); |
| when(dispatcher.getEventHandler()).thenReturn(eventHandler); |
| |
| QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class); |
| queInfo.setQueueName("testqueue"); |
| QueueConfigurations queueConfigs = |
| recordFactory.newRecordInstance(QueueConfigurations.class); |
| queueConfigs.setCapacity(0.5f); |
| queueConfigs.setAbsoluteCapacity(0.1f); |
| queueConfigs.setMaxCapacity(1.0f); |
| queueConfigs.setAbsoluteMaxCapacity(1.0f); |
| queueConfigs.setMaxAMPercentage(0.2f); |
| Map<String, QueueConfigurations> queueConfigsByPartition = |
| new HashMap<>(); |
| queueConfigsByPartition.put("*", queueConfigs); |
| queInfo.setQueueConfigurations(queueConfigsByPartition); |
| |
| when(scheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean())) |
| .thenReturn(queInfo); |
| when(scheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), |
| anyBoolean())).thenThrow(new IOException("queue does not exist")); |
| RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); |
| when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); |
| SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); |
| when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); |
| when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); |
| ConcurrentHashMap<ApplicationId, RMApp> apps = |
| getRMApps(rmContext, scheduler); |
| when(rmContext.getRMApps()).thenReturn(apps); |
| when(scheduler.getAppsInQueue(eq("testqueue"))).thenReturn( |
| getSchedulerApps(apps)); |
| when(rmContext.getScheduler()).thenReturn(scheduler); |
| } |
| |
| private ConcurrentHashMap<ApplicationId, RMApp> getRMApps( |
| RMContext rmContext, YarnScheduler yarnScheduler) { |
| ConcurrentHashMap<ApplicationId, RMApp> apps = |
| new ConcurrentHashMap<ApplicationId, RMApp>(); |
| ApplicationId applicationId1 = getApplicationId(1); |
| ApplicationId applicationId2 = getApplicationId(2); |
| ApplicationId applicationId3 = getApplicationId(3); |
| YarnConfiguration config = new YarnConfiguration(); |
| apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1, |
| config, "testqueue", 10, 3, null, null)); |
| apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2, |
| config, "a", 20, 2, null, "")); |
| apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3, |
| config, "testqueue", 40, 5, "high-mem", "high-mem")); |
| return apps; |
| } |
| |
| private List<ApplicationAttemptId> getSchedulerApps( |
| Map<ApplicationId, RMApp> apps) { |
| List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>(); |
| // Return app IDs for the apps in testqueue (as defined in getRMApps) |
| schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0)); |
| schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0)); |
| return schedApps; |
| } |
| |
| private static ApplicationId getApplicationId(int id) { |
| return ApplicationId.newInstance(123456, id); |
| } |
| |
| private static ApplicationAttemptId getApplicationAttemptId(int id) { |
| return ApplicationAttemptId.newInstance(getApplicationId(id), 1); |
| } |
| |
| private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, |
| ApplicationId applicationId3, YarnConfiguration config, String queueName, |
| final long memorySeconds, final long vcoreSeconds, |
| String appNodeLabelExpression, String amNodeLabelExpression) { |
| ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); |
| when(asContext.getMaxAppAttempts()).thenReturn(1); |
| when(asContext.getNodeLabelExpression()).thenReturn(appNodeLabelExpression); |
| when(asContext.getPriority()).thenReturn(Priority.newInstance(0)); |
| RMAppImpl app = |
| spy(new RMAppImpl(applicationId3, rmContext, config, null, null, |
| queueName, asContext, yarnScheduler, null, |
| System.currentTimeMillis(), "YARN", null, |
| Collections.singletonList(BuilderUtils.newResourceRequest( |
| RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, |
| Resource.newInstance(1024, 1), 1))){ |
| @Override |
| public ApplicationReport createAndGetApplicationReport( |
| String clientUserName, boolean allowAccess) { |
| ApplicationReport report = super.createAndGetApplicationReport( |
| clientUserName, allowAccess); |
| ApplicationResourceUsageReport usageReport = |
| report.getApplicationResourceUsageReport(); |
| usageReport.setMemorySeconds(memorySeconds); |
| usageReport.setVcoreSeconds(vcoreSeconds); |
| report.setApplicationResourceUsageReport(usageReport); |
| return report; |
| } |
| }); |
| app.getAMResourceRequests().get(0) |
| .setNodeLabelExpression(amNodeLabelExpression); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId, |
| rmContext, yarnScheduler, null, asContext, config, null, app)); |
| Container container = Container.newInstance( |
| ContainerId.newContainerId(attemptId, 1), null, |
| "", null, null, null); |
| RMContainerImpl containerimpl = spy(new RMContainerImpl(container, |
| SchedulerRequestKey.extractFrom(container), attemptId, null, "", |
| rmContext)); |
| Map<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>(); |
| attempts.put(attemptId, rmAppAttemptImpl); |
| when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); |
| when(app.getAppAttempts()).thenReturn(attempts); |
| when(app.getApplicationPriority()).thenReturn(Priority.newInstance(0)); |
| when(rmAppAttemptImpl.getMasterContainer()).thenReturn(container); |
| ResourceScheduler rs = mock(ResourceScheduler.class); |
| when(rmContext.getScheduler()).thenReturn(rs); |
| when(rmContext.getScheduler().getRMContainer(any(ContainerId.class))) |
| .thenReturn(containerimpl); |
| SchedulerAppReport sAppReport = mock(SchedulerAppReport.class); |
| when( |
| rmContext.getScheduler().getSchedulerAppInfo( |
| any(ApplicationAttemptId.class))).thenReturn(sAppReport); |
| List<RMContainer> rmContainers = new ArrayList<RMContainer>(); |
| rmContainers.add(containerimpl); |
| when( |
| rmContext.getScheduler().getSchedulerAppInfo(attemptId) |
| .getLiveContainers()).thenReturn(rmContainers); |
| ContainerStatus cs = mock(ContainerStatus.class); |
| when(containerimpl.completed()).thenReturn(false); |
| when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A"); |
| when(containerimpl.getContainerExitStatus()).thenReturn(0); |
| when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE); |
| return app; |
| } |
| |
| private static ResourceScheduler mockResourceScheduler() |
| throws YarnException { |
| ResourceScheduler scheduler = mock(ResourceScheduler.class); |
| when(scheduler.getMinimumResourceCapability()).thenReturn( |
| Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); |
| when(scheduler.getMaximumResourceCapability()).thenReturn( |
| Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); |
| when(scheduler.getMaximumResourceCapability(anyString())).thenReturn( |
| Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); |
| when(scheduler.getAppsInQueue(QUEUE_1)).thenReturn( |
| Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102))); |
| when(scheduler.getAppsInQueue(QUEUE_2)).thenReturn( |
| Arrays.asList(getApplicationAttemptId(103))); |
| ApplicationAttemptId attemptId = getApplicationAttemptId(1); |
| when(scheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); |
| |
| ResourceCalculator rs = mock(ResourceCalculator.class); |
| when(scheduler.getResourceCalculator()).thenReturn(rs); |
| |
| when(scheduler.checkAndGetApplicationPriority(any(Priority.class), |
| any(UserGroupInformation.class), anyString(), any(ApplicationId.class))) |
| .thenReturn(Priority.newInstance(0)); |
| return scheduler; |
| } |
| |
| private ResourceManager setupResourceManager() { |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| ReservationSystemTestUtil.setupQueueConfiguration(conf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); |
| MockRM rm = new MockRM(conf); |
| resourceManager = rm; |
| rm.start(); |
| try { |
| rm.registerNode("127.0.0.1:1", 102400, 100); |
| // allow plan follower to synchronize |
| Thread.sleep(1050); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| return rm; |
| } |
| |
| private ReservationSubmissionRequest submitReservationTestHelper( |
| ClientRMService clientService, long arrival, long deadline, |
| long duration) { |
| ReservationSubmissionResponse sResponse = null; |
| GetNewReservationRequest newReservationRequest = |
| GetNewReservationRequest.newInstance(); |
| ReservationId reservationID = null; |
| try { |
| reservationID = clientService.getNewReservation(newReservationRequest) |
| .getReservationId(); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| ReservationSubmissionRequest sRequest = |
| ReservationSystemTestUtil.createSimpleReservationRequest(reservationID, |
| 4, arrival, deadline, duration); |
| try { |
| sResponse = clientService.submitReservation(sRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(sResponse); |
| Assert.assertNotNull(reservationID); |
| System.out.println("Submit reservation response: " + reservationID); |
| return sRequest; |
| } |
| |
| @Test |
| public void testCreateReservation() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| // Submit the reservation again with the same request and make sure it |
| // passes. |
| try { |
| clientService.submitReservation(sRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| |
| // Submit the reservation with the same reservation id but different |
| // reservation definition, and ensure YarnException is thrown. |
| arrival = clock.getTime(); |
| ReservationDefinition rDef = sRequest.getReservationDefinition(); |
| rDef.setArrival(arrival + duration); |
| sRequest.setReservationDefinition(rDef); |
| try { |
| clientService.submitReservation(sRequest); |
| Assert.fail("Reservation submission should fail if a duplicate " |
| + "reservation id is used, but the reservation definition has been " |
| + "updated."); |
| } catch (Exception e) { |
| Assert.assertTrue(e instanceof YarnException); |
| } |
| } |
| |
| @Test |
| public void testUpdateReservation() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| ReservationDefinition rDef = sRequest.getReservationDefinition(); |
| ReservationRequest rr = |
| rDef.getReservationRequests().getReservationResources().get(0); |
| ReservationId reservationID = sRequest.getReservationId(); |
| rr.setNumContainers(5); |
| arrival = clock.getTime(); |
| duration = 30000; |
| deadline = (long) (arrival + 1.05 * duration); |
| rr.setDuration(duration); |
| rDef.setArrival(arrival); |
| rDef.setDeadline(deadline); |
| ReservationUpdateRequest uRequest = |
| ReservationUpdateRequest.newInstance(rDef, reservationID); |
| ReservationUpdateResponse uResponse = null; |
| try { |
| uResponse = clientService.updateReservation(uRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(uResponse); |
| System.out.println("Update reservation response: " + uResponse); |
| } |
| |
| @Test |
| public void testListReservationsByReservationId() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| ReservationId reservationID = sRequest.getReservationId(); |
| ReservationListResponse response = null; |
| ReservationListRequest request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1, |
| -1, false); |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(response); |
| Assert.assertEquals(1, response.getReservationAllocationState().size()); |
| Assert.assertEquals(response.getReservationAllocationState().get(0) |
| .getReservationId().getId(), reservationID.getId()); |
| Assert.assertEquals(response.getReservationAllocationState().get(0) |
| .getResourceAllocationRequests().size(), 0); |
| } |
| |
| @Test |
| public void testListReservationsByTimeInterval() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| // List reservations, search by a point in time within the reservation |
| // range. |
| arrival = clock.getTime(); |
| ReservationId reservationID = sRequest.getReservationId(); |
| ReservationListRequest request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2, |
| arrival + duration / 2, true); |
| |
| ReservationListResponse response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(response); |
| Assert.assertEquals(1, response.getReservationAllocationState().size()); |
| Assert.assertEquals(response.getReservationAllocationState().get(0) |
| .getReservationId().getId(), reservationID.getId()); |
| // List reservations, search by time within reservation interval. |
| request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true); |
| |
| response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(response); |
| Assert.assertEquals(1, response.getReservationAllocationState().size()); |
| Assert.assertEquals(response.getReservationAllocationState().get(0) |
| .getReservationId().getId(), reservationID.getId()); |
| // Verify that the full resource allocations exist. |
| Assert.assertTrue(response.getReservationAllocationState().get(0) |
| .getResourceAllocationRequests().size() > 0); |
| |
| // Verify that the full RDL is returned. |
| ReservationRequests reservationRequests = |
| response.getReservationAllocationState().get(0) |
| .getReservationDefinition().getReservationRequests(); |
| Assert.assertEquals("R_ALL", |
| reservationRequests.getInterpreter().toString()); |
| Assert.assertTrue(reservationRequests.getReservationResources().get(0) |
| .getDuration() == duration); |
| } |
| |
| @Test |
| public void testListReservationsByInvalidTimeInterval() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| // List reservations, search by invalid end time == -1. |
| ReservationListRequest request = ReservationListRequest |
| .newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true); |
| |
| ReservationListResponse response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(response); |
| Assert.assertEquals(1, response.getReservationAllocationState().size()); |
| Assert.assertEquals(response.getReservationAllocationState().get(0) |
| .getReservationId().getId(), sRequest.getReservationId().getId()); |
| |
| // List reservations, search by invalid end time < -1. |
| request = ReservationListRequest |
| .newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -10, true); |
| |
| response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(response); |
| Assert.assertEquals(1, response.getReservationAllocationState().size()); |
| Assert.assertEquals(response.getReservationAllocationState().get(0) |
| .getReservationId().getId(), sRequest.getReservationId().getId()); |
| } |
| |
| @Test |
| public void testListReservationsByTimeIntervalContainingNoReservations() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| // List reservations, search by very large start time. |
| ReservationListRequest request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1, false); |
| |
| ReservationListResponse response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| |
| // Ensure all reservations are filtered out. |
| Assert.assertNotNull(response); |
| assertThat(response.getReservationAllocationState()).isEmpty(); |
| |
| duration = 30000; |
| deadline = sRequest.getReservationDefinition().getDeadline(); |
| |
| // List reservations, search by start time after the reservation |
| // end time. |
| request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, "", deadline + duration, |
| deadline + 2 * duration, false); |
| |
| response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| |
| // Ensure all reservations are filtered out. |
| Assert.assertNotNull(response); |
| assertThat(response.getReservationAllocationState()).isEmpty(); |
| |
| arrival = clock.getTime(); |
| // List reservations, search by end time before the reservation start |
| // time. |
| request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration, |
| false); |
| |
| response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| |
| // Ensure all reservations are filtered out. |
| Assert.assertNotNull(response); |
| assertThat(response.getReservationAllocationState()).isEmpty(); |
| |
| // List reservations, search by very small end time. |
| request = ReservationListRequest |
| .newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false); |
| |
| response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| |
| // Ensure all reservations are filtered out. |
| Assert.assertNotNull(response); |
| assertThat(response.getReservationAllocationState()).isEmpty(); |
| } |
| |
| @Test |
| public void testReservationDelete() { |
| resourceManager = setupResourceManager(); |
| ClientRMService clientService = resourceManager.getClientRMService(); |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| submitReservationTestHelper(clientService, arrival, deadline, duration); |
| |
| ReservationId reservationID = sRequest.getReservationId(); |
| // Delete the reservation |
| ReservationDeleteRequest dRequest = |
| ReservationDeleteRequest.newInstance(reservationID); |
| ReservationDeleteResponse dResponse = null; |
| try { |
| dResponse = clientService.deleteReservation(dRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(dResponse); |
| System.out.println("Delete reservation response: " + dResponse); |
| |
| // List reservations, search by non-existent reservationID |
| ReservationListRequest request = ReservationListRequest.newInstance( |
| ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1, |
| -1, false); |
| |
| ReservationListResponse response = null; |
| try { |
| response = clientService.listReservations(request); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(response); |
| Assert.assertEquals(0, response.getReservationAllocationState().size()); |
| } |
| |
| @Test |
| public void testGetNodeLabels() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, |
| this.queueACLsManager, this.getRMContext() |
| .getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| NodeLabel labelX = NodeLabel.newInstance("x", false); |
| NodeLabel labelY = NodeLabel.newInstance("y"); |
| RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); |
| labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY)); |
| |
| NodeId node1 = NodeId.newInstance("host1", 1234); |
| NodeId node2 = NodeId.newInstance("host2", 1234); |
| Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); |
| map.put(node1, ImmutableSet.of("x")); |
| map.put(node2, ImmutableSet.of("y")); |
| labelsMgr.replaceLabelsOnNode(map); |
| |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Get node labels collection |
| GetClusterNodeLabelsResponse response = client |
| .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); |
| Assert.assertTrue(response.getNodeLabelList().containsAll( |
| Arrays.asList(labelX, labelY))); |
| |
| // Get node labels mapping |
| GetNodesToLabelsResponse response1 = client |
| .getNodeToLabels(GetNodesToLabelsRequest.newInstance()); |
| Map<NodeId, Set<String>> nodeToLabels = response1.getNodeToLabels(); |
| Assert.assertTrue(nodeToLabels.keySet().containsAll( |
| Arrays.asList(node1, node2))); |
| Assert.assertTrue(nodeToLabels.get(node1) |
| .containsAll(Arrays.asList(labelX.getName()))); |
| Assert.assertTrue(nodeToLabels.get(node2) |
| .containsAll(Arrays.asList(labelY.getName()))); |
| // Below label "x" is not present in the response as exclusivity is true |
| Assert.assertFalse(nodeToLabels.get(node1).containsAll( |
| Arrays.asList(NodeLabel.newInstance("x")))); |
| } |
| |
| @Test |
| public void testGetLabelsToNodes() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, |
| this.queueACLsManager, this.getRMContext() |
| .getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| NodeLabel labelX = NodeLabel.newInstance("x", false); |
| NodeLabel labelY = NodeLabel.newInstance("y", false); |
| NodeLabel labelZ = NodeLabel.newInstance("z", false); |
| RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); |
| labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY, labelZ)); |
| |
| NodeId node1A = NodeId.newInstance("host1", 1234); |
| NodeId node1B = NodeId.newInstance("host1", 5678); |
| NodeId node2A = NodeId.newInstance("host2", 1234); |
| NodeId node3A = NodeId.newInstance("host3", 1234); |
| NodeId node3B = NodeId.newInstance("host3", 5678); |
| Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); |
| map.put(node1A, ImmutableSet.of("x")); |
| map.put(node1B, ImmutableSet.of("z")); |
| map.put(node2A, ImmutableSet.of("y")); |
| map.put(node3A, ImmutableSet.of("y")); |
| map.put(node3B, ImmutableSet.of("z")); |
| labelsMgr.replaceLabelsOnNode(map); |
| |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Get node labels collection |
| GetClusterNodeLabelsResponse response = client |
| .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); |
| Assert.assertTrue(response.getNodeLabelList().containsAll( |
| Arrays.asList(labelX, labelY, labelZ))); |
| |
| // Get labels to nodes mapping |
| GetLabelsToNodesResponse response1 = client |
| .getLabelsToNodes(GetLabelsToNodesRequest.newInstance()); |
| Map<String, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes(); |
| Assert.assertTrue(labelsToNodes.keySet().containsAll( |
| Arrays.asList(labelX.getName(), labelY.getName(), labelZ.getName()))); |
| Assert.assertTrue(labelsToNodes.get(labelX.getName()).containsAll( |
| Arrays.asList(node1A))); |
| Assert.assertTrue(labelsToNodes.get(labelY.getName()).containsAll( |
| Arrays.asList(node2A, node3A))); |
| Assert.assertTrue(labelsToNodes.get(labelZ.getName()).containsAll( |
| Arrays.asList(node1B, node3B))); |
| |
| // Get labels to nodes mapping for specific labels |
| Set<String> setlabels = new HashSet<String>(Arrays.asList(new String[]{"x", |
| "z"})); |
| GetLabelsToNodesResponse response2 = client |
| .getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels)); |
| labelsToNodes = response2.getLabelsToNodes(); |
| Assert.assertTrue(labelsToNodes.keySet().containsAll( |
| Arrays.asList(labelX.getName(), labelZ.getName()))); |
| Assert.assertTrue(labelsToNodes.get(labelX.getName()).containsAll( |
| Arrays.asList(node1A))); |
| Assert.assertTrue(labelsToNodes.get(labelZ.getName()).containsAll( |
| Arrays.asList(node1B, node3B))); |
| assertThat(labelsToNodes.get(labelY.getName())).isNull(); |
| } |
| |
| @Test(timeout = 120000) |
| public void testGetClusterNodeAttributes() throws IOException, YarnException { |
| Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null); |
| MockRM rm = new MockRM(newConf) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, |
| this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| } |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); |
| NodeId host1 = NodeId.newInstance("host1", 0); |
| NodeId host2 = NodeId.newInstance("host2", 0); |
| NodeAttribute gpu = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", |
| NodeAttributeType.STRING, "nvida"); |
| NodeAttribute os = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", |
| NodeAttributeType.STRING, "windows64"); |
| NodeAttribute docker = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", |
| NodeAttributeType.STRING, "docker0"); |
| Map<String, Set<NodeAttribute>> nodes = new HashMap<>(); |
| nodes.put(host1.getHost(), ImmutableSet.of(gpu, os)); |
| nodes.put(host2.getHost(), ImmutableSet.of(docker)); |
| mgr.addNodeAttributes(nodes); |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| GetClusterNodeAttributesRequest request = |
| GetClusterNodeAttributesRequest.newInstance(); |
| GetClusterNodeAttributesResponse response = |
| client.getClusterNodeAttributes(request); |
| Set<NodeAttributeInfo> attributes = response.getNodeAttributes(); |
| Assert.assertEquals("Size not correct", 3, attributes.size()); |
| Assert.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(gpu))); |
| Assert.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(os))); |
| Assert |
| .assertTrue(attributes.contains(NodeAttributeInfo.newInstance(docker))); |
| } |
| |
| @Test(timeout = 120000) |
| public void testGetAttributesToNodes() throws IOException, YarnException { |
| Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null); |
| MockRM rm = new MockRM(newConf) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, |
| this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| } |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); |
| String node1 = "host1"; |
| String node2 = "host2"; |
| NodeAttribute gpu = |
| NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", |
| NodeAttributeType.STRING, "nvidia"); |
| NodeAttribute os = |
| NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", |
| NodeAttributeType.STRING, "windows64"); |
| NodeAttribute docker = |
| NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", |
| NodeAttributeType.STRING, "docker0"); |
| NodeAttribute dist = |
| NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION", |
| NodeAttributeType.STRING, "3_0_2"); |
| Map<String, Set<NodeAttribute>> nodes = new HashMap<>(); |
| nodes.put(node1, ImmutableSet.of(gpu, os, dist)); |
| nodes.put(node2, ImmutableSet.of(docker, dist)); |
| mgr.addNodeAttributes(nodes); |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| GetAttributesToNodesRequest request = |
| GetAttributesToNodesRequest.newInstance(); |
| GetAttributesToNodesResponse response = |
| client.getAttributesToNodes(request); |
| Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = |
| response.getAttributesToNodes(); |
| assertThat(response.getAttributesToNodes()).hasSize(4); |
| assertThat(attrs.get(dist.getAttributeKey())).hasSize(2); |
| assertThat(attrs.get(os.getAttributeKey())).hasSize(1); |
| assertThat(attrs.get(gpu.getAttributeKey())).hasSize(1); |
| Assert.assertTrue(findHostnameAndValInMapping(node1, "3_0_2", |
| attrs.get(dist.getAttributeKey()))); |
| Assert.assertTrue(findHostnameAndValInMapping(node2, "3_0_2", |
| attrs.get(dist.getAttributeKey()))); |
| Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0", |
| attrs.get(docker.getAttributeKey()))); |
| |
| GetAttributesToNodesRequest request2 = GetAttributesToNodesRequest |
| .newInstance(ImmutableSet.of(docker.getAttributeKey())); |
| GetAttributesToNodesResponse response2 = |
| client.getAttributesToNodes(request2); |
| Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs2 = |
| response2.getAttributesToNodes(); |
| assertThat(attrs2).hasSize(1); |
| Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0", |
| attrs2.get(docker.getAttributeKey()))); |
| |
| GetAttributesToNodesRequest request3 = |
| GetAttributesToNodesRequest.newInstance( |
| ImmutableSet.of(docker.getAttributeKey(), os.getAttributeKey())); |
| GetAttributesToNodesResponse response3 = |
| client.getAttributesToNodes(request3); |
| Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs3 = |
| response3.getAttributesToNodes(); |
| assertThat(attrs3).hasSize(2); |
| Assert.assertTrue(findHostnameAndValInMapping(node1, "windows64", |
| attrs3.get(os.getAttributeKey()))); |
| Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0", |
| attrs3.get(docker.getAttributeKey()))); |
| } |
| |
| private boolean findHostnameAndValInMapping(String hostname, String attrVal, |
| List<NodeToAttributeValue> mappingVals) { |
| for (NodeToAttributeValue value : mappingVals) { |
| if (value.getHostname().equals(hostname)) { |
| return attrVal.equals(value.getAttributeValue()); |
| } |
| } |
| return false; |
| } |
| |
| @Test(timeout = 120000) |
| public void testGetNodesToAttributes() throws IOException, YarnException { |
| Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null); |
| MockRM rm = new MockRM(newConf) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, |
| this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| } |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); |
| String node1 = "host1"; |
| String node2 = "host2"; |
| NodeAttribute gpu = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", |
| NodeAttributeType.STRING, "nvida"); |
| NodeAttribute os = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", |
| NodeAttributeType.STRING, "windows64"); |
| NodeAttribute docker = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", |
| NodeAttributeType.STRING, "docker0"); |
| NodeAttribute dist = NodeAttribute |
| .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION", |
| NodeAttributeType.STRING, "3_0_2"); |
| Map<String, Set<NodeAttribute>> nodes = new HashMap<>(); |
| nodes.put(node1, ImmutableSet.of(gpu, os, dist)); |
| nodes.put(node2, ImmutableSet.of(docker, dist)); |
| mgr.addNodeAttributes(nodes); |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Specify null for hostnames. |
| GetNodesToAttributesRequest request1 = |
| GetNodesToAttributesRequest.newInstance(null); |
| GetNodesToAttributesResponse response1 = |
| client.getNodesToAttributes(request1); |
| Map<String, Set<NodeAttribute>> hostToAttrs = |
| response1.getNodeToAttributes(); |
| Assert.assertEquals(2, hostToAttrs.size()); |
| |
| Assert.assertTrue(hostToAttrs.get(node2).contains(dist)); |
| Assert.assertTrue(hostToAttrs.get(node2).contains(docker)); |
| Assert.assertTrue(hostToAttrs.get(node1).contains(dist)); |
| |
| // Specify particular node |
| GetNodesToAttributesRequest request2 = |
| GetNodesToAttributesRequest.newInstance(ImmutableSet.of(node1)); |
| GetNodesToAttributesResponse response2 = |
| client.getNodesToAttributes(request2); |
| hostToAttrs = response2.getNodeToAttributes(); |
| Assert.assertEquals(1, response2.getNodeToAttributes().size()); |
| Assert.assertTrue(hostToAttrs.get(node1).contains(dist)); |
| |
| // Test queury with empty set |
| GetNodesToAttributesRequest request3 = |
| GetNodesToAttributesRequest.newInstance(Collections.emptySet()); |
| GetNodesToAttributesResponse response3 = |
| client.getNodesToAttributes(request3); |
| hostToAttrs = response3.getNodeToAttributes(); |
| Assert.assertEquals(2, hostToAttrs.size()); |
| |
| Assert.assertTrue(hostToAttrs.get(node2).contains(dist)); |
| Assert.assertTrue(hostToAttrs.get(node2).contains(docker)); |
| Assert.assertTrue(hostToAttrs.get(node1).contains(dist)); |
| |
| // test invalid hostname |
| GetNodesToAttributesRequest request4 = |
| GetNodesToAttributesRequest.newInstance(ImmutableSet.of("invalid")); |
| GetNodesToAttributesResponse response4 = |
| client.getNodesToAttributes(request4); |
| hostToAttrs = response4.getNodeToAttributes(); |
| Assert.assertEquals(0, hostToAttrs.size()); |
| } |
| |
| @Test(timeout = 120000) |
| public void testUpdatePriorityAndKillAppWithZeroClusterResource() |
| throws Exception { |
| int maxPriority = 10; |
| int appPriority = 5; |
| conf = new YarnConfiguration(); |
| Assume.assumeFalse("FairScheduler does not support Application Priorities", |
| conf.get(YarnConfiguration.RM_SCHEDULER) |
| .equals(FairScheduler.class.getName())); |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, |
| maxPriority); |
| MockRM rm = new MockRM(conf); |
| resourceManager = rm; |
| rm.init(conf); |
| rm.start(); |
| MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder |
| .createWithMemory(1024, rm) |
| .withAppPriority(Priority.newInstance(appPriority)) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| ClientRMService rmService = rm.getClientRMService(); |
| testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority); |
| rm.killApp(app1.getApplicationId()); |
| rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); |
| } |
| |
| @Test(timeout = 120000) |
| public void testUpdateApplicationPriorityRequest() throws Exception { |
| int maxPriority = 10; |
| int appPriority = 5; |
| conf = new YarnConfiguration(); |
| Assume.assumeFalse("FairScheduler does not support Application Priorities", |
| conf.get(YarnConfiguration.RM_SCHEDULER) |
| .equals(FairScheduler.class.getName())); |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, |
| maxPriority); |
| MockRM rm = new MockRM(conf); |
| resourceManager = rm; |
| rm.init(conf); |
| rm.start(); |
| rm.registerNode("host1:1234", 1024); |
| // Start app1 with appPriority 5 |
| MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder |
| .createWithMemory(1024, rm) |
| .withAppPriority(Priority.newInstance(appPriority)) |
| .build(); |
| RMApp app1 = MockRMAppSubmitter.submit(rm, data); |
| |
| Assert.assertEquals("Incorrect priority has been set to application", |
| appPriority, app1.getApplicationPriority().getPriority()); |
| |
| appPriority = 11; |
| ClientRMService rmService = rm.getClientRMService(); |
| testApplicationPriorityUpdation(rmService, app1, appPriority, maxPriority); |
| |
| appPriority = 9; |
| testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority); |
| |
| rm.killApp(app1.getApplicationId()); |
| rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); |
| |
| |
| // Update priority request for invalid application id. |
| ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3); |
| UpdateApplicationPriorityRequest updateRequest = |
| UpdateApplicationPriorityRequest.newInstance(invalidAppId, |
| Priority.newInstance(appPriority)); |
| try { |
| rmService.updateApplicationPriority(updateRequest); |
| Assert.fail("ApplicationNotFoundException should be thrown " |
| + "for invalid application id"); |
| } catch (ApplicationNotFoundException e) { |
| // Expected |
| } |
| |
| updateRequest = |
| UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(), |
| Priority.newInstance(11)); |
| Assert.assertEquals("Incorrect priority has been set to application", |
| appPriority, rmService.updateApplicationPriority(updateRequest) |
| .getApplicationPriority().getPriority()); |
| } |
| |
| private void testApplicationPriorityUpdation(ClientRMService rmService, |
| RMApp app1, int tobeUpdatedPriority, int expected) throws YarnException, |
| IOException { |
| UpdateApplicationPriorityRequest updateRequest = |
| UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(), |
| Priority.newInstance(tobeUpdatedPriority)); |
| |
| UpdateApplicationPriorityResponse updateApplicationPriority = |
| rmService.updateApplicationPriority(updateRequest); |
| |
| Assert.assertEquals("Incorrect priority has been set to application", |
| expected, app1.getApplicationSubmissionContext().getPriority() |
| .getPriority()); |
| Assert.assertEquals("Incorrect priority has been returned", expected, |
| updateApplicationPriority.getApplicationPriority().getPriority()); |
| } |
| |
| private File createExcludeFile(File testDir) throws IOException { |
| File excludeFile = new File(testDir, "excludeFile"); |
| try (FileOutputStream out = new FileOutputStream(excludeFile)) { |
| out.write("decommisssionedHost".getBytes(UTF_8)); |
| } |
| return excludeFile; |
| } |
| |
| @Test |
| public void testRMStartWithDecommissionedNode() throws Exception { |
| File testDir = GenericTestUtils.getRandomizedTestDir(); |
| assertTrue("Failed to create test directory: " + testDir.getAbsolutePath(), testDir.mkdirs()); |
| try { |
| File excludeFile = createExcludeFile(testDir); |
| conf = new YarnConfiguration(); |
| conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, |
| excludeFile.getAbsolutePath()); |
| MockRM rm = new MockRM(conf) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Make call |
| GetClusterNodesRequest request = |
| GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); |
| List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports(); |
| assertEquals(1, nodeReports.size()); |
| } finally { |
| FileUtil.fullyDelete(testDir); |
| } |
| } |
| |
| @Test |
| public void testGetResourceTypesInfoWhenResourceProfileDisabled() |
| throws Exception { |
| conf = new YarnConfiguration(); |
| MockRM rm = new MockRM(conf) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| } |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Make call |
| GetAllResourceTypeInfoRequest request = |
| GetAllResourceTypeInfoRequest.newInstance(); |
| GetAllResourceTypeInfoResponse response = client.getResourceTypeInfo(request); |
| |
| Assert.assertEquals(2, response.getResourceTypeInfo().size()); |
| |
| // Check memory |
| Assert.assertEquals(ResourceInformation.MEMORY_MB.getName(), |
| response.getResourceTypeInfo().get(0).getName()); |
| Assert.assertEquals(ResourceInformation.MEMORY_MB.getUnits(), |
| response.getResourceTypeInfo().get(0).getDefaultUnit()); |
| |
| // Check vcores |
| Assert.assertEquals(ResourceInformation.VCORES.getName(), |
| response.getResourceTypeInfo().get(1).getName()); |
| Assert.assertEquals(ResourceInformation.VCORES.getUnits(), |
| response.getResourceTypeInfo().get(1).getDefaultUnit()); |
| } |
| |
| @Test |
| public void testGetApplicationsWithPerUserApps() |
| throws IOException, YarnException { |
| /* |
| * Submit 3 applications alternately in two queues |
| */ |
| // Basic setup |
| ResourceScheduler scheduler = mockResourceScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(scheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) |
| .getRMTimelineCollectorManager(); |
| |
| RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, |
| mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()) |
| .thenReturn(new EventHandler<Event>() { |
| public void handle(Event event) { |
| } |
| }); |
| |
| // Simulate Queue ACL manager which returns false always |
| QueueACLsManager queueAclsManager = mock(QueueACLsManager.class); |
| when(queueAclsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), any(RMApp.class), any(String.class), |
| anyList())).thenReturn(false); |
| |
| // Simulate app ACL manager which returns false always |
| ApplicationACLsManager appAclsManager = mock(ApplicationACLsManager.class); |
| when(appAclsManager.checkAccess(eq(UserGroupInformation.getCurrentUser()), |
| any(ApplicationAccessType.class), any(String.class), |
| any(ApplicationId.class))).thenReturn(false); |
| ClientRMService rmService = new ClientRMService(rmContext, scheduler, |
| appManager, appAclsManager, queueAclsManager, null); |
| rmService.init(new Configuration()); |
| |
| // Initialize appnames and queues |
| String[] queues = {QUEUE_1, QUEUE_2}; |
| String[] appNames = {MockApps.newAppName(), MockApps.newAppName(), |
| MockApps.newAppName()}; |
| ApplicationId[] appIds = {getApplicationId(101), getApplicationId(102), |
| getApplicationId(103)}; |
| List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3"); |
| |
| long[] submitTimeMillis = new long[3]; |
| // Submit applications |
| for (int i = 0; i < appIds.length; i++) { |
| ApplicationId appId = appIds[i]; |
| SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId, |
| appNames[i], queues[i % queues.length], |
| new HashSet<String>(tags.subList(0, i + 1))); |
| rmService.submitApplication(submitRequest); |
| submitTimeMillis[i] = System.currentTimeMillis(); |
| } |
| |
| // Test different cases of ClientRMService#getApplications() |
| GetApplicationsRequest request = GetApplicationsRequest.newInstance(); |
| assertEquals("Incorrect total number of apps", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| rmService.setDisplayPerUserApps(true); |
| assertEquals("Incorrect number of applications for user", 0, |
| rmService.getApplications(request).getApplicationList().size()); |
| rmService.setDisplayPerUserApps(false); |
| } |
| |
| @Test |
| public void testRegisterNMWithDiffUnits() throws Exception { |
| ResourceUtils.resetResourceTypes(); |
| Configuration yarnConf = new YarnConfiguration(); |
| String resourceTypesFileName = "resource-types-4.xml"; |
| InputStream source = |
| yarnConf.getClassLoader().getResourceAsStream(resourceTypesFileName); |
| resourceTypesFile = new File(yarnConf.getClassLoader(). |
| getResource(".").getPath(), "resource-types.xml"); |
| FileUtils.copyInputStreamToFile(source, resourceTypesFile); |
| ResourceUtils.getResourceTypes(); |
| |
| yarnConf.setClass( |
| CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, |
| DominantResourceCalculator.class, ResourceCalculator.class); |
| |
| MockRM rm = new MockRM(yarnConf) { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| Resource resource = Resources.createResource(976562); |
| resource.setResourceInformation("memory-mb", |
| ResourceInformation.newInstance("memory-mb", "G", 976562)); |
| resource.setResourceInformation("resource1", |
| ResourceInformation.newInstance("resource1", "T", 1)); |
| resource.setResourceInformation("resource2", |
| ResourceInformation.newInstance("resource2", "M", 1)); |
| |
| MockNM node = rm.registerNode("host1:1234", resource); |
| node.nodeHeartbeat(true); |
| |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Make call |
| GetClusterNodesRequest request = |
| GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING)); |
| List<NodeReport> nodeReports = |
| client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(1, nodeReports.size()); |
| Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY, |
| nodeReports.get(0).getNodeState()); |
| Assert.assertEquals(1, nodeReports.size()); |
| |
| //Resource 'resource1' has been passed as 1T while registering NM. |
| //1T should be converted to 1000G |
| Assert.assertEquals("G", nodeReports.get(0).getCapability(). |
| getResourceInformation("resource1").getUnits()); |
| Assert.assertEquals(1000, nodeReports.get(0).getCapability(). |
| getResourceInformation("resource1").getValue()); |
| |
| //Resource 'resource2' has been passed as 1M while registering NM |
| //1M should be converted to 1000000000M |
| Assert.assertEquals("m", nodeReports.get(0).getCapability(). |
| getResourceInformation("resource2").getUnits()); |
| Assert.assertEquals(1000000000, nodeReports.get(0).getCapability(). |
| getResourceInformation("resource2").getValue()); |
| |
| //Resource 'memory-mb' has been passed as 976562G while registering NM |
| //976562G should be converted to 976562Mi |
| Assert.assertEquals("Mi", nodeReports.get(0).getCapability(). |
| getResourceInformation("memory-mb").getUnits()); |
| Assert.assertEquals(976562, nodeReports.get(0).getCapability(). |
| getResourceInformation("memory-mb").getValue()); |
| } |
| |
| @Test |
| public void testGetClusterMetrics() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| resourceManager = rm; |
| rm.start(); |
| |
| ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); |
| clusterMetrics.incrDecommissioningNMs(); |
| repeat(2, clusterMetrics::incrDecommisionedNMs); |
| repeat(3, clusterMetrics::incrNumActiveNodes); |
| repeat(4, clusterMetrics::incrNumLostNMs); |
| repeat(5, clusterMetrics::incrNumUnhealthyNMs); |
| repeat(6, clusterMetrics::incrNumRebootedNMs); |
| repeat(7, clusterMetrics::incrNumShutdownNMs); |
| |
| // Create a client. |
| conf = new Configuration(); |
| rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| client = (ApplicationClientProtocol) rpc.getProxy( |
| ApplicationClientProtocol.class, rmAddress, conf); |
| |
| YarnClusterMetrics ymetrics = client.getClusterMetrics( |
| GetClusterMetricsRequest.newInstance()).getClusterMetrics(); |
| |
| Assert.assertEquals(0, ymetrics.getNumNodeManagers()); |
| Assert.assertEquals(1, ymetrics.getNumDecommissioningNodeManagers()); |
| Assert.assertEquals(2, ymetrics.getNumDecommissionedNodeManagers()); |
| Assert.assertEquals(3, ymetrics.getNumActiveNodeManagers()); |
| Assert.assertEquals(4, ymetrics.getNumLostNodeManagers()); |
| Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers()); |
| Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers()); |
| Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers()); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if (resourceTypesFile != null && resourceTypesFile.exists()) { |
| resourceTypesFile.delete(); |
| } |
| ClusterMetrics.destroy(); |
| DefaultMetricsSystem.shutdown(); |
| if (conf != null && client != null && rpc != null) { |
| rpc.stopProxy(client, conf); |
| } |
| if (resourceManager != null) { |
| resourceManager.close(); |
| } |
| } |
| |
| private static void repeat(int n, Runnable r) { |
| for (int i = 0; i < n; ++i) { |
| r.run(); |
| } |
| } |
| } |