| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyBoolean; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CyclicBarrier; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authentication.util.KerberosName; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.MockApps; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; |
| import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeLabel; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.ReservationDefinition; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.ReservationRequest; |
| import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; |
| import org.apache.hadoop.yarn.api.records.ReservationRequests; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.Event; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; |
| import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; |
| import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hadoop.yarn.util.UTCClock; |
| import org.apache.hadoop.yarn.util.resource.ResourceCalculator; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Sets; |
| |
| public class TestClientRMService { |
| |
| private static final Log LOG = LogFactory.getLog(TestClientRMService.class); |
| |
| private RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| private String appType = "MockApp"; |
| |
| private static RMDelegationTokenSecretManager dtsm; |
| |
| private final static String QUEUE_1 = "Q-1"; |
| private final static String QUEUE_2 = "Q-2"; |
| private final static String kerberosRule = "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT"; |
| static { |
| KerberosName.setRules(kerberosRule); |
| } |
| |
| @BeforeClass |
| public static void setupSecretManager() throws IOException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getStateStore()).thenReturn(new NullRMStateStore()); |
| dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext); |
| dtsm.startThreads(); |
| } |
| |
| @AfterClass |
| public static void teardownSecretManager() { |
| if (dtsm != null) { |
| dtsm.stopThreads(); |
| } |
| } |
| |
| @Test |
| public void testGetClusterNodes() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, |
| this.getRMContext().getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| rm.start(); |
| RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); |
| labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); |
| |
| // Add a healthy node with label = x |
| MockNM node = rm.registerNode("host1:1234", 1024); |
| Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); |
| map.put(node.getNodeId(), ImmutableSet.of("x")); |
| labelsMgr.replaceLabelsOnNode(map); |
| rm.sendNodeStarted(node); |
| node.nodeHeartbeat(true); |
| |
| // Add and lose a node with label = y |
| MockNM lostNode = rm.registerNode("host2:1235", 1024); |
| rm.sendNodeStarted(lostNode); |
| lostNode.nodeHeartbeat(true); |
| rm.NMwaitForState(lostNode.getNodeId(), NodeState.RUNNING); |
| rm.sendNodeLost(lostNode); |
| |
| // Create a client. |
| Configuration conf = new Configuration(); |
| YarnRPC rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| ApplicationClientProtocol client = |
| (ApplicationClientProtocol) rpc |
| .getProxy(ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Make call |
| GetClusterNodesRequest request = |
| GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING)); |
| List<NodeReport> nodeReports = |
| client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(1, nodeReports.size()); |
| Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY, |
| nodeReports.get(0).getNodeState()); |
| |
| // Check node's label = x |
| Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); |
| |
| // Now make the node unhealthy. |
| node.nodeHeartbeat(false); |
| |
| // Call again |
| nodeReports = client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals("Unhealthy nodes should not show up by default", 0, |
| nodeReports.size()); |
| |
| // Change label of host1 to y |
| map = new HashMap<NodeId, Set<String>>(); |
| map.put(node.getNodeId(), ImmutableSet.of("y")); |
| labelsMgr.replaceLabelsOnNode(map); |
| |
| // Now query for UNHEALTHY nodes |
| request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY)); |
| nodeReports = client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(1, nodeReports.size()); |
| Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, |
| nodeReports.get(0).getNodeState()); |
| |
| Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); |
| |
| // Remove labels of host1 |
| map = new HashMap<NodeId, Set<String>>(); |
| map.put(node.getNodeId(), ImmutableSet.of("y")); |
| labelsMgr.removeLabelsFromNode(map); |
| |
| // Query all states should return all nodes |
| rm.registerNode("host3:1236", 1024); |
| request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); |
| nodeReports = client.getClusterNodes(request).getNodeReports(); |
| Assert.assertEquals(3, nodeReports.size()); |
| |
| // All host1-3's label should be empty (instead of null) |
| for (NodeReport report : nodeReports) { |
| Assert.assertTrue(report.getNodeLabels() != null |
| && report.getNodeLabels().isEmpty()); |
| } |
| |
| rpc.stopProxy(client, conf); |
| rm.close(); |
| } |
| |
| @Test |
| public void testNonExistingApplicationReport() throws YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn( |
| new ConcurrentHashMap<ApplicationId, RMApp>()); |
| ClientRMService rmService = new ClientRMService(rmContext, null, null, |
| null, null, null); |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| GetApplicationReportRequest request = recordFactory |
| .newRecordInstance(GetApplicationReportRequest.class); |
| request.setApplicationId(ApplicationId.newInstance(0, 0)); |
| try { |
| rmService.getApplicationReport(request); |
| Assert.fail(); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.assertEquals(ex.getMessage(), |
| "Application with id '" + request.getApplicationId() |
| + "' doesn't exist in RM."); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationReport() throws Exception { |
| YarnScheduler yarnScheduler = mock(YarnScheduler.class); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| |
| ApplicationId appId1 = getApplicationId(1); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| when( |
| mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); |
| |
| ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, |
| null, mockAclsManager, null, null); |
| try { |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| GetApplicationReportRequest request = recordFactory |
| .newRecordInstance(GetApplicationReportRequest.class); |
| request.setApplicationId(appId1); |
| GetApplicationReportResponse response = |
| rmService.getApplicationReport(request); |
| ApplicationReport report = response.getApplicationReport(); |
| ApplicationResourceUsageReport usageReport = |
| report.getApplicationResourceUsageReport(); |
| Assert.assertEquals(10, usageReport.getMemorySeconds()); |
| Assert.assertEquals(3, usageReport.getVcoreSeconds()); |
| } finally { |
| rmService.close(); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationAttemptReport() throws YarnException, |
| IOException { |
| ClientRMService rmService = createRMService(); |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| GetApplicationAttemptReportRequest request = recordFactory |
| .newRecordInstance(GetApplicationAttemptReportRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| request.setApplicationAttemptId(attemptId); |
| |
| try { |
| GetApplicationAttemptReportResponse response = rmService |
| .getApplicationAttemptReport(request); |
| Assert.assertEquals(attemptId, response.getApplicationAttemptReport() |
| .getApplicationAttemptId()); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testGetApplicationResourceUsageReportDummy() throws YarnException, |
| IOException { |
| ApplicationAttemptId attemptId = getApplicationAttemptId(1); |
| YarnScheduler yarnScheduler = mockYarnScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) { |
| } |
| }); |
| ApplicationSubmissionContext asContext = |
| mock(ApplicationSubmissionContext.class); |
| YarnConfiguration config = new YarnConfiguration(); |
| RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, |
| rmContext, yarnScheduler, null, asContext, config, false, null); |
| ApplicationResourceUsageReport report = rmAppAttemptImpl |
| .getApplicationResourceUsageReport(); |
| assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); |
| } |
| |
| @Test |
| public void testGetApplicationAttempts() throws YarnException, IOException { |
| ClientRMService rmService = createRMService(); |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| GetApplicationAttemptsRequest request = recordFactory |
| .newRecordInstance(GetApplicationAttemptsRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| request.setApplicationId(ApplicationId.newInstance(123456, 1)); |
| |
| try { |
| GetApplicationAttemptsResponse response = rmService |
| .getApplicationAttempts(request); |
| Assert.assertEquals(1, response.getApplicationAttemptList().size()); |
| Assert.assertEquals(attemptId, response.getApplicationAttemptList() |
| .get(0).getApplicationAttemptId()); |
| |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testGetContainerReport() throws YarnException, IOException { |
| ClientRMService rmService = createRMService(); |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| GetContainerReportRequest request = recordFactory |
| .newRecordInstance(GetContainerReportRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| ContainerId containerId = ContainerId.newContainerId(attemptId, 1); |
| request.setContainerId(containerId); |
| |
| try { |
| GetContainerReportResponse response = rmService |
| .getContainerReport(request); |
| Assert.assertEquals(containerId, response.getContainerReport() |
| .getContainerId()); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| @Test |
| public void testGetContainers() throws YarnException, IOException { |
| ClientRMService rmService = createRMService(); |
| RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| GetContainersRequest request = recordFactory |
| .newRecordInstance(GetContainersRequest.class); |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| ContainerId containerId = ContainerId.newContainerId(attemptId, 1); |
| request.setApplicationAttemptId(attemptId); |
| try { |
| GetContainersResponse response = rmService.getContainers(request); |
| Assert.assertEquals(containerId, response.getContainerList().get(0) |
| .getContainerId()); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.fail(ex.getMessage()); |
| } |
| } |
| |
| public ClientRMService createRMService() throws IOException { |
| YarnScheduler yarnScheduler = mockYarnScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, |
| yarnScheduler); |
| when(rmContext.getRMApps()).thenReturn(apps); |
| when(rmContext.getYarnConfiguration()).thenReturn(new Configuration()); |
| RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null, |
| mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) { |
| } |
| }); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when( |
| mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), anyString())).thenReturn(true); |
| return new ClientRMService(rmContext, yarnScheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| } |
| |
| @Test |
| public void testForceKillNonExistingApplication() throws YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn( |
| new ConcurrentHashMap<ApplicationId, RMApp>()); |
| ClientRMService rmService = new ClientRMService(rmContext, null, null, |
| null, null, null); |
| ApplicationId applicationId = |
| BuilderUtils.newApplicationId(System.currentTimeMillis(), 0); |
| KillApplicationRequest request = |
| KillApplicationRequest.newInstance(applicationId); |
| try { |
| rmService.forceKillApplication(request); |
| Assert.fail(); |
| } catch (ApplicationNotFoundException ex) { |
| Assert.assertEquals(ex.getMessage(), |
| "Trying to kill an absent " + |
| "application " + request.getApplicationId()); |
| } |
| } |
| |
| @Test |
| public void testForceKillApplication() throws Exception { |
| YarnConfiguration conf = new YarnConfiguration(); |
| MockRM rm = new MockRM(); |
| rm.init(conf); |
| rm.start(); |
| |
| ClientRMService rmService = rm.getClientRMService(); |
| GetApplicationsRequest getRequest = GetApplicationsRequest.newInstance( |
| EnumSet.of(YarnApplicationState.KILLED)); |
| |
| RMApp app1 = rm.submitApp(1024); |
| RMApp app2 = rm.submitApp(1024, true); |
| |
| assertEquals("Incorrect number of apps in the RM", 0, |
| rmService.getApplications(getRequest).getApplicationList().size()); |
| |
| KillApplicationRequest killRequest1 = |
| KillApplicationRequest.newInstance(app1.getApplicationId()); |
| KillApplicationRequest killRequest2 = |
| KillApplicationRequest.newInstance(app2.getApplicationId()); |
| |
| int killAttemptCount = 0; |
| for (int i = 0; i < 100; i++) { |
| KillApplicationResponse killResponse1 = |
| rmService.forceKillApplication(killRequest1); |
| killAttemptCount++; |
| if (killResponse1.getIsKillCompleted()) { |
| break; |
| } |
| Thread.sleep(10); |
| } |
| assertTrue("Kill attempt count should be greater than 1 for managed AMs", |
| killAttemptCount > 1); |
| assertEquals("Incorrect number of apps in the RM", 1, |
| rmService.getApplications(getRequest).getApplicationList().size()); |
| |
| KillApplicationResponse killResponse2 = |
| rmService.forceKillApplication(killRequest2); |
| assertTrue("Killing UnmanagedAM should falsely acknowledge true", |
| killResponse2.getIsKillCompleted()); |
| for (int i = 0; i < 100; i++) { |
| if (2 == |
| rmService.getApplications(getRequest).getApplicationList().size()) { |
| break; |
| } |
| Thread.sleep(10); |
| } |
| assertEquals("Incorrect number of apps in the RM", 2, |
| rmService.getApplications(getRequest).getApplicationList().size()); |
| } |
| |
| @Test (expected = ApplicationNotFoundException.class) |
| public void testMoveAbsentApplication() throws YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| when(rmContext.getRMApps()).thenReturn( |
| new ConcurrentHashMap<ApplicationId, RMApp>()); |
| ClientRMService rmService = new ClientRMService(rmContext, null, null, |
| null, null, null); |
| ApplicationId applicationId = |
| BuilderUtils.newApplicationId(System.currentTimeMillis(), 0); |
| MoveApplicationAcrossQueuesRequest request = |
| MoveApplicationAcrossQueuesRequest.newInstance(applicationId, "newqueue"); |
| rmService.moveApplicationAcrossQueues(request); |
| } |
| |
| @Test |
| public void testGetQueueInfo() throws Exception { |
| YarnScheduler yarnScheduler = mock(YarnScheduler.class); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), anyString())).thenReturn(true); |
| when(mockAclsManager.checkAccess(any(UserGroupInformation.class), |
| any(ApplicationAccessType.class), anyString(), |
| any(ApplicationId.class))).thenReturn(true); |
| |
| ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, |
| null, mockAclsManager, mockQueueACLsManager, null); |
| GetQueueInfoRequest request = recordFactory |
| .newRecordInstance(GetQueueInfoRequest.class); |
| request.setQueueName("testqueue"); |
| request.setIncludeApplications(true); |
| GetQueueInfoResponse queueInfo = rmService.getQueueInfo(request); |
| List<ApplicationReport> applications = queueInfo.getQueueInfo() |
| .getApplications(); |
| Assert.assertEquals(2, applications.size()); |
| request.setQueueName("nonexistentqueue"); |
| request.setIncludeApplications(true); |
| // should not throw exception on nonexistent queue |
| queueInfo = rmService.getQueueInfo(request); |
| |
| // Case where user does not have application access |
| ApplicationACLsManager mockAclsManager1 = |
| mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager1 = |
| mock(QueueACLsManager.class); |
| when(mockQueueACLsManager1.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), anyString())).thenReturn(false); |
| when(mockAclsManager1.checkAccess(any(UserGroupInformation.class), |
| any(ApplicationAccessType.class), anyString(), |
| any(ApplicationId.class))).thenReturn(false); |
| |
| ClientRMService rmService1 = new ClientRMService(rmContext, yarnScheduler, |
| null, mockAclsManager1, mockQueueACLsManager1, null); |
| request.setQueueName("testqueue"); |
| request.setIncludeApplications(true); |
| GetQueueInfoResponse queueInfo1 = rmService1.getQueueInfo(request); |
| List<ApplicationReport> applications1 = queueInfo1.getQueueInfo() |
| .getApplications(); |
| Assert.assertEquals(0, applications1.size()); |
| } |
| |
| private static final UserGroupInformation owner = |
| UserGroupInformation.createRemoteUser("owner"); |
| private static final UserGroupInformation other = |
| UserGroupInformation.createRemoteUser("other"); |
| private static final UserGroupInformation tester = |
| UserGroupInformation.createRemoteUser("tester"); |
| private static final String testerPrincipal = "tester@EXAMPLE.COM"; |
| private static final String ownerPrincipal = "owner@EXAMPLE.COM"; |
| private static final String otherPrincipal = "other@EXAMPLE.COM"; |
| private static final UserGroupInformation testerKerb = |
| UserGroupInformation.createRemoteUser(testerPrincipal); |
| private static final UserGroupInformation ownerKerb = |
| UserGroupInformation.createRemoteUser(ownerPrincipal); |
| private static final UserGroupInformation otherKerb = |
| UserGroupInformation.createRemoteUser(otherPrincipal); |
| |
| @Test |
| public void testTokenRenewalByOwner() throws Exception { |
| owner.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| checkTokenRenewal(owner, owner); |
| return null; |
| } |
| }); |
| } |
| |
| @Test |
| public void testTokenRenewalWrongUser() throws Exception { |
| try { |
| owner.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| try { |
| checkTokenRenewal(owner, other); |
| return null; |
| } catch (YarnException ex) { |
| Assert.assertTrue(ex.getMessage().contains(owner.getUserName() + |
| " tries to renew a token with renewer " + |
| other.getUserName())); |
| throw ex; |
| } |
| } |
| }); |
| } catch (Exception e) { |
| return; |
| } |
| Assert.fail("renew should have failed"); |
| } |
| |
| @Test |
| public void testTokenRenewalByLoginUser() throws Exception { |
| UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| checkTokenRenewal(owner, owner); |
| checkTokenRenewal(owner, other); |
| return null; |
| } |
| }); |
| } |
| |
| private void checkTokenRenewal(UserGroupInformation owner, |
| UserGroupInformation renewer) throws IOException, YarnException { |
| RMDelegationTokenIdentifier tokenIdentifier = |
| new RMDelegationTokenIdentifier( |
| new Text(owner.getUserName()), new Text(renewer.getUserName()), null); |
| Token<?> token = |
| new Token<RMDelegationTokenIdentifier>(tokenIdentifier, dtsm); |
| org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils.newDelegationToken( |
| token.getIdentifier(), token.getKind().toString(), |
| token.getPassword(), token.getService().toString()); |
| RenewDelegationTokenRequest request = |
| Records.newRecord(RenewDelegationTokenRequest.class); |
| request.setDelegationToken(dToken); |
| |
| RMContext rmContext = mock(RMContext.class); |
| ClientRMService rmService = new ClientRMService( |
| rmContext, null, null, null, null, dtsm); |
| rmService.renewDelegationToken(request); |
| } |
| |
| @Test |
| public void testTokenCancellationByOwner() throws Exception { |
| // two tests required - one with a kerberos name |
| // and with a short name |
| RMContext rmContext = mock(RMContext.class); |
| final ClientRMService rmService = |
| new ClientRMService(rmContext, null, null, null, null, dtsm); |
| testerKerb.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| checkTokenCancellation(rmService, testerKerb, other); |
| return null; |
| } |
| }); |
| owner.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| checkTokenCancellation(owner, other); |
| return null; |
| } |
| }); |
| } |
| |
| @Test |
| public void testTokenCancellationByRenewer() throws Exception { |
| // two tests required - one with a kerberos name |
| // and with a short name |
| RMContext rmContext = mock(RMContext.class); |
| final ClientRMService rmService = |
| new ClientRMService(rmContext, null, null, null, null, dtsm); |
| testerKerb.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| checkTokenCancellation(rmService, owner, testerKerb); |
| return null; |
| } |
| }); |
| other.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| checkTokenCancellation(owner, other); |
| return null; |
| } |
| }); |
| } |
| |
| @Test |
| public void testTokenCancellationByWrongUser() { |
| // two sets to test - |
| // 1. try to cancel tokens of short and kerberos users as a kerberos UGI |
| // 2. try to cancel tokens of short and kerberos users as a simple auth UGI |
| |
| RMContext rmContext = mock(RMContext.class); |
| final ClientRMService rmService = |
| new ClientRMService(rmContext, null, null, null, null, dtsm); |
| UserGroupInformation[] kerbTestOwners = |
| { owner, other, tester, ownerKerb, otherKerb }; |
| UserGroupInformation[] kerbTestRenewers = |
| { owner, other, ownerKerb, otherKerb }; |
| for (final UserGroupInformation tokOwner : kerbTestOwners) { |
| for (final UserGroupInformation tokRenewer : kerbTestRenewers) { |
| try { |
| testerKerb.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| try { |
| checkTokenCancellation(rmService, tokOwner, tokRenewer); |
| Assert.fail("We should not reach here; token owner = " |
| + tokOwner.getUserName() + ", renewer = " |
| + tokRenewer.getUserName()); |
| return null; |
| } catch (YarnException e) { |
| Assert.assertTrue(e.getMessage().contains( |
| testerKerb.getUserName() |
| + " is not authorized to cancel the token")); |
| return null; |
| } |
| } |
| }); |
| } catch (Exception e) { |
| Assert.fail("Unexpected exception; " + e.getMessage()); |
| } |
| } |
| } |
| |
| UserGroupInformation[] simpleTestOwners = |
| { owner, other, ownerKerb, otherKerb, testerKerb }; |
| UserGroupInformation[] simpleTestRenewers = |
| { owner, other, ownerKerb, otherKerb }; |
| for (final UserGroupInformation tokOwner : simpleTestOwners) { |
| for (final UserGroupInformation tokRenewer : simpleTestRenewers) { |
| try { |
| tester.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| try { |
| checkTokenCancellation(tokOwner, tokRenewer); |
| Assert.fail("We should not reach here; token owner = " |
| + tokOwner.getUserName() + ", renewer = " |
| + tokRenewer.getUserName()); |
| return null; |
| } catch (YarnException ex) { |
| Assert.assertTrue(ex.getMessage().contains( |
| tester.getUserName() |
| + " is not authorized to cancel the token")); |
| return null; |
| } |
| } |
| }); |
| } catch (Exception e) { |
| Assert.fail("Unexpected exception; " + e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| private void checkTokenCancellation(UserGroupInformation owner, |
| UserGroupInformation renewer) throws IOException, YarnException { |
| RMContext rmContext = mock(RMContext.class); |
| final ClientRMService rmService = |
| new ClientRMService(rmContext, null, null, null, null, dtsm); |
| checkTokenCancellation(rmService, owner, renewer); |
| } |
| |
| private void checkTokenCancellation(ClientRMService rmService, |
| UserGroupInformation owner, UserGroupInformation renewer) |
| throws IOException, YarnException { |
| RMDelegationTokenIdentifier tokenIdentifier = |
| new RMDelegationTokenIdentifier(new Text(owner.getUserName()), |
| new Text(renewer.getUserName()), null); |
| Token<?> token = |
| new Token<RMDelegationTokenIdentifier>(tokenIdentifier, dtsm); |
| org.apache.hadoop.yarn.api.records.Token dToken = |
| BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind() |
| .toString(), token.getPassword(), token.getService().toString()); |
| CancelDelegationTokenRequest request = |
| Records.newRecord(CancelDelegationTokenRequest.class); |
| request.setDelegationToken(dToken); |
| rmService.cancelDelegationToken(request); |
| } |
| |
| @Test (timeout = 30000) |
| @SuppressWarnings ("rawtypes") |
| public void testAppSubmit() throws Exception { |
| YarnScheduler yarnScheduler = mockYarnScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) {} |
| }); |
| ApplicationId appId1 = getApplicationId(100); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| when( |
| mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); |
| |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), anyString())).thenReturn(true); |
| ClientRMService rmService = |
| new ClientRMService(rmContext, yarnScheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| |
| // without name and queue |
| |
| SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( |
| appId1, null, null); |
| try { |
| rmService.submitApplication(submitRequest1); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app1 = rmContext.getRMApps().get(appId1); |
| Assert.assertNotNull("app doesn't exist", app1); |
| Assert.assertEquals("app name doesn't match", |
| YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName()); |
| Assert.assertEquals("app queue doesn't match", |
| YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue()); |
| |
| // with name and queue |
| String name = MockApps.newAppName(); |
| String queue = MockApps.newQueue(); |
| ApplicationId appId2 = getApplicationId(101); |
| SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( |
| appId2, name, queue); |
| submitRequest2.getApplicationSubmissionContext().setApplicationType( |
| "matchType"); |
| try { |
| rmService.submitApplication(submitRequest2); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| RMApp app2 = rmContext.getRMApps().get(appId2); |
| Assert.assertNotNull("app doesn't exist", app2); |
| Assert.assertEquals("app name doesn't match", name, app2.getName()); |
| Assert.assertEquals("app queue doesn't match", queue, app2.getQueue()); |
| |
| // duplicate appId |
| try { |
| rmService.submitApplication(submitRequest2); |
| } catch (YarnException e) { |
| Assert.fail("Exception is not expected."); |
| } |
| |
| GetApplicationsRequest getAllAppsRequest = |
| GetApplicationsRequest.newInstance(new HashSet<String>()); |
| GetApplicationsResponse getAllApplicationsResponse = |
| rmService.getApplications(getAllAppsRequest); |
| Assert.assertEquals(5, |
| getAllApplicationsResponse.getApplicationList().size()); |
| |
| Set<String> appTypes = new HashSet<String>(); |
| appTypes.add("matchType"); |
| |
| getAllAppsRequest = GetApplicationsRequest.newInstance(appTypes); |
| getAllApplicationsResponse = |
| rmService.getApplications(getAllAppsRequest); |
| Assert.assertEquals(1, |
| getAllApplicationsResponse.getApplicationList().size()); |
| Assert.assertEquals(appId2, |
| getAllApplicationsResponse.getApplicationList() |
| .get(0).getApplicationId()); |
| } |
| |
| @Test |
| public void testGetApplications() throws IOException, YarnException { |
| /** |
| * 1. Submit 3 applications alternately in two queues |
| * 2. Test each of the filters |
| */ |
| // Basic setup |
| YarnScheduler yarnScheduler = mockYarnScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn( |
| new EventHandler<Event>() { |
| public void handle(Event event) {} |
| }); |
| |
| ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); |
| QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); |
| when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), |
| any(QueueACL.class), anyString())).thenReturn(true); |
| ClientRMService rmService = |
| new ClientRMService(rmContext, yarnScheduler, appManager, |
| mockAclsManager, mockQueueACLsManager, null); |
| |
| // Initialize appnames and queues |
| String[] queues = {QUEUE_1, QUEUE_2}; |
| String[] appNames = |
| {MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()}; |
| ApplicationId[] appIds = |
| {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; |
| List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3"); |
| |
| long[] submitTimeMillis = new long[3]; |
| // Submit applications |
| for (int i = 0; i < appIds.length; i++) { |
| ApplicationId appId = appIds[i]; |
| when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), |
| ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true); |
| SubmitApplicationRequest submitRequest = mockSubmitAppRequest( |
| appId, appNames[i], queues[i % queues.length], |
| new HashSet<String>(tags.subList(0, i + 1))); |
| rmService.submitApplication(submitRequest); |
| submitTimeMillis[i] = System.currentTimeMillis(); |
| } |
| |
| // Test different cases of ClientRMService#getApplications() |
| GetApplicationsRequest request = GetApplicationsRequest.newInstance(); |
| assertEquals("Incorrect total number of apps", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check limit |
| request.setLimit(1L); |
| assertEquals("Failed to limit applications", 1, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check start range |
| request = GetApplicationsRequest.newInstance(); |
| request.setStartRange(submitTimeMillis[0], System.currentTimeMillis()); |
| |
| // 2 applications are submitted after first timeMills |
| assertEquals("Incorrect number of matching start range", |
| 2, rmService.getApplications(request).getApplicationList().size()); |
| |
| // 1 application is submitted after the second timeMills |
| request.setStartRange(submitTimeMillis[1], System.currentTimeMillis()); |
| assertEquals("Incorrect number of matching start range", |
| 1, rmService.getApplications(request).getApplicationList().size()); |
| |
| // no application is submitted after the third timeMills |
| request.setStartRange(submitTimeMillis[2], System.currentTimeMillis()); |
| assertEquals("Incorrect number of matching start range", |
| 0, rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check queue |
| request = GetApplicationsRequest.newInstance(); |
| Set<String> queueSet = new HashSet<String>(); |
| request.setQueues(queueSet); |
| |
| queueSet.add(queues[0]); |
| assertEquals("Incorrect number of applications in queue", 2, |
| rmService.getApplications(request).getApplicationList().size()); |
| assertEquals("Incorrect number of applications in queue", 2, |
| rmService.getApplications(request, false).getApplicationList().size()); |
| |
| queueSet.add(queues[1]); |
| assertEquals("Incorrect number of applications in queue", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check user |
| request = GetApplicationsRequest.newInstance(); |
| Set<String> userSet = new HashSet<String>(); |
| request.setUsers(userSet); |
| |
| userSet.add("random-user-name"); |
| assertEquals("Incorrect number of applications for user", 0, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| userSet.add(UserGroupInformation.getCurrentUser().getShortUserName()); |
| assertEquals("Incorrect number of applications for user", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check tags |
| request = GetApplicationsRequest.newInstance( |
| ApplicationsRequestScope.ALL, null, null, null, null, null, null, |
| null, null); |
| Set<String> tagSet = new HashSet<String>(); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| tagSet = Sets.newHashSet(tags.get(0)); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| tagSet = Sets.newHashSet(tags.get(1)); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 2, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| tagSet = Sets.newHashSet(tags.get(2)); |
| request.setApplicationTags(tagSet); |
| assertEquals("Incorrect number of matching tags", 1, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| // Check scope |
| request = GetApplicationsRequest.newInstance( |
| ApplicationsRequestScope.VIEWABLE); |
| assertEquals("Incorrect number of applications for the scope", 6, |
| rmService.getApplications(request).getApplicationList().size()); |
| |
| request = GetApplicationsRequest.newInstance( |
| ApplicationsRequestScope.OWN); |
| assertEquals("Incorrect number of applications for the scope", 3, |
| rmService.getApplications(request).getApplicationList().size()); |
| } |
| |
| @Test(timeout=4000) |
| public void testConcurrentAppSubmit() |
| throws IOException, InterruptedException, BrokenBarrierException, |
| YarnException { |
| YarnScheduler yarnScheduler = mockYarnScheduler(); |
| RMContext rmContext = mock(RMContext.class); |
| mockRMContext(yarnScheduler, rmContext); |
| RMStateStore stateStore = mock(RMStateStore.class); |
| when(rmContext.getStateStore()).thenReturn(stateStore); |
| RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, |
| null, mock(ApplicationACLsManager.class), new Configuration()); |
| |
| final ApplicationId appId1 = getApplicationId(100); |
| final ApplicationId appId2 = getApplicationId(101); |
| final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest( |
| appId1, null, null); |
| final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( |
| appId2, null, null); |
| |
| final CyclicBarrier startBarrier = new CyclicBarrier(2); |
| final CyclicBarrier endBarrier = new CyclicBarrier(2); |
| |
| @SuppressWarnings("rawtypes") |
| EventHandler eventHandler = new EventHandler() { |
| @Override |
| public void handle(Event rawEvent) { |
| if (rawEvent instanceof RMAppEvent) { |
| RMAppEvent event = (RMAppEvent) rawEvent; |
| if (event.getApplicationId().equals(appId1)) { |
| try { |
| startBarrier.await(); |
| endBarrier.await(); |
| } catch (BrokenBarrierException e) { |
| LOG.warn("Broken Barrier", e); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while awaiting barriers", e); |
| } |
| } |
| } |
| } |
| }; |
| |
| when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler); |
| |
| final ClientRMService rmService = |
| new ClientRMService(rmContext, yarnScheduler, appManager, null, null, |
| null); |
| |
| // submit an app and wait for it to block while in app submission |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| rmService.submitApplication(submitRequest1); |
| } catch (YarnException e) {} |
| } |
| }; |
| t.start(); |
| |
| // submit another app, so go through while the first app is blocked |
| startBarrier.await(); |
| rmService.submitApplication(submitRequest2); |
| endBarrier.await(); |
| t.join(); |
| } |
| |
| private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, |
| String name, String queue) { |
| return mockSubmitAppRequest(appId, name, queue, null); |
| } |
| |
| private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, |
| String name, String queue, Set<String> tags) { |
| return mockSubmitAppRequest(appId, name, queue, tags, false); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId, |
| String name, String queue, Set<String> tags, boolean unmanaged) { |
| |
| ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); |
| |
| Resource resource = Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); |
| |
| ApplicationSubmissionContext submissionContext = |
| recordFactory.newRecordInstance(ApplicationSubmissionContext.class); |
| submissionContext.setAMContainerSpec(amContainerSpec); |
| submissionContext.setApplicationName(name); |
| submissionContext.setQueue(queue); |
| submissionContext.setApplicationId(appId); |
| submissionContext.setResource(resource); |
| submissionContext.setApplicationType(appType); |
| submissionContext.setApplicationTags(tags); |
| submissionContext.setUnmanagedAM(unmanaged); |
| |
| SubmitApplicationRequest submitRequest = |
| recordFactory.newRecordInstance(SubmitApplicationRequest.class); |
| submitRequest.setApplicationSubmissionContext(submissionContext); |
| return submitRequest; |
| } |
| |
| private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) |
| throws IOException { |
| Dispatcher dispatcher = mock(Dispatcher.class); |
| when(rmContext.getDispatcher()).thenReturn(dispatcher); |
| EventHandler eventHandler = mock(EventHandler.class); |
| when(dispatcher.getEventHandler()).thenReturn(eventHandler); |
| QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class); |
| queInfo.setQueueName("testqueue"); |
| when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean())) |
| .thenReturn(queInfo); |
| when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean())) |
| .thenThrow(new IOException("queue does not exist")); |
| RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); |
| when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); |
| SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); |
| when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); |
| when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); |
| ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, |
| yarnScheduler); |
| when(rmContext.getRMApps()).thenReturn(apps); |
| when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn( |
| getSchedulerApps(apps)); |
| ResourceScheduler rs = mock(ResourceScheduler.class); |
| when(rmContext.getScheduler()).thenReturn(rs); |
| } |
| |
| private ConcurrentHashMap<ApplicationId, RMApp> getRMApps( |
| RMContext rmContext, YarnScheduler yarnScheduler) { |
| ConcurrentHashMap<ApplicationId, RMApp> apps = |
| new ConcurrentHashMap<ApplicationId, RMApp>(); |
| ApplicationId applicationId1 = getApplicationId(1); |
| ApplicationId applicationId2 = getApplicationId(2); |
| ApplicationId applicationId3 = getApplicationId(3); |
| YarnConfiguration config = new YarnConfiguration(); |
| apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1, |
| config, "testqueue", 10, 3)); |
| apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2, |
| config, "a", 20, 2)); |
| apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3, |
| config, "testqueue", 40, 5)); |
| return apps; |
| } |
| |
| private List<ApplicationAttemptId> getSchedulerApps( |
| Map<ApplicationId, RMApp> apps) { |
| List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>(); |
| // Return app IDs for the apps in testqueue (as defined in getRMApps) |
| schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0)); |
| schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0)); |
| return schedApps; |
| } |
| |
| private static ApplicationId getApplicationId(int id) { |
| return ApplicationId.newInstance(123456, id); |
| } |
| |
| private static ApplicationAttemptId getApplicationAttemptId(int id) { |
| return ApplicationAttemptId.newInstance(getApplicationId(id), 1); |
| } |
| |
| private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, |
| ApplicationId applicationId3, YarnConfiguration config, String queueName, |
| final long memorySeconds, final long vcoreSeconds) { |
| ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); |
| when(asContext.getMaxAppAttempts()).thenReturn(1); |
| |
| RMAppImpl app = |
| spy(new RMAppImpl(applicationId3, rmContext, config, null, null, |
| queueName, asContext, yarnScheduler, null, |
| System.currentTimeMillis(), "YARN", null, |
| BuilderUtils.newResourceRequest( |
| RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, |
| Resource.newInstance(1024, 1), 1)){ |
| @Override |
| public ApplicationReport createAndGetApplicationReport( |
| String clientUserName, boolean allowAccess) { |
| ApplicationReport report = super.createAndGetApplicationReport( |
| clientUserName, allowAccess); |
| ApplicationResourceUsageReport usageReport = |
| report.getApplicationResourceUsageReport(); |
| usageReport.setMemorySeconds(memorySeconds); |
| usageReport.setVcoreSeconds(vcoreSeconds); |
| report.setApplicationResourceUsageReport(usageReport); |
| return report; |
| } |
| }); |
| |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| ApplicationId.newInstance(123456, 1), 1); |
| RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId, |
| rmContext, yarnScheduler, null, asContext, config, false, null)); |
| Container container = Container.newInstance( |
| ContainerId.newContainerId(attemptId, 1), null, "", null, null, null); |
| RMContainerImpl containerimpl = spy(new RMContainerImpl(container, |
| attemptId, null, "", rmContext)); |
| Map<ApplicationAttemptId, RMAppAttempt> attempts = |
| new HashMap<ApplicationAttemptId, RMAppAttempt>(); |
| attempts.put(attemptId, rmAppAttemptImpl); |
| when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); |
| when(app.getAppAttempts()).thenReturn(attempts); |
| when(rmAppAttemptImpl.getMasterContainer()).thenReturn(container); |
| ResourceScheduler rs = mock(ResourceScheduler.class); |
| when(rmContext.getScheduler()).thenReturn(rs); |
| when(rmContext.getScheduler().getRMContainer(any(ContainerId.class))) |
| .thenReturn(containerimpl); |
| SchedulerAppReport sAppReport = mock(SchedulerAppReport.class); |
| when( |
| rmContext.getScheduler().getSchedulerAppInfo( |
| any(ApplicationAttemptId.class))).thenReturn(sAppReport); |
| List<RMContainer> rmContainers = new ArrayList<RMContainer>(); |
| rmContainers.add(containerimpl); |
| when( |
| rmContext.getScheduler().getSchedulerAppInfo(attemptId) |
| .getLiveContainers()).thenReturn(rmContainers); |
| ContainerStatus cs = mock(ContainerStatus.class); |
| when(containerimpl.getFinishedStatus()).thenReturn(cs); |
| when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A"); |
| when(containerimpl.getContainerExitStatus()).thenReturn(0); |
| when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE); |
| return app; |
| } |
| |
| private static YarnScheduler mockYarnScheduler() { |
| YarnScheduler yarnScheduler = mock(YarnScheduler.class); |
| when(yarnScheduler.getMinimumResourceCapability()).thenReturn( |
| Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); |
| when(yarnScheduler.getMaximumResourceCapability()).thenReturn( |
| Resources.createResource( |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); |
| when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn( |
| Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102))); |
| when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn( |
| Arrays.asList(getApplicationAttemptId(103))); |
| ApplicationAttemptId attemptId = getApplicationAttemptId(1); |
| when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); |
| |
| ResourceCalculator rs = mock(ResourceCalculator.class); |
| when(yarnScheduler.getResourceCalculator()).thenReturn(rs); |
| |
| return yarnScheduler; |
| } |
| |
| @Test |
| public void testReservationAPIs() { |
| // initialize |
| CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); |
| ReservationSystemTestUtil.setupQueueConfiguration(conf); |
| conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| ResourceScheduler.class); |
| conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); |
| MockRM rm = new MockRM(conf); |
| rm.start(); |
| MockNM nm; |
| try { |
| nm = rm.registerNode("127.0.0.1:1", 102400, 100); |
| // allow plan follower to synchronize |
| Thread.sleep(1050); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| |
| // Create a client. |
| ClientRMService clientService = rm.getClientRMService(); |
| |
| // create a reservation |
| Clock clock = new UTCClock(); |
| long arrival = clock.getTime(); |
| long duration = 60000; |
| long deadline = (long) (arrival + 1.05 * duration); |
| ReservationSubmissionRequest sRequest = |
| createSimpleReservationRequest(4, arrival, deadline, duration); |
| ReservationSubmissionResponse sResponse = null; |
| try { |
| sResponse = clientService.submitReservation(sRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(sResponse); |
| ReservationId reservationID = sResponse.getReservationId(); |
| Assert.assertNotNull(reservationID); |
| LOG.info("Submit reservation response: " + reservationID); |
| |
| // Update the reservation |
| ReservationDefinition rDef = sRequest.getReservationDefinition(); |
| ReservationRequest rr = |
| rDef.getReservationRequests().getReservationResources().get(0); |
| rr.setNumContainers(5); |
| arrival = clock.getTime(); |
| duration = 30000; |
| deadline = (long) (arrival + 1.05 * duration); |
| rr.setDuration(duration); |
| rDef.setArrival(arrival); |
| rDef.setDeadline(deadline); |
| ReservationUpdateRequest uRequest = |
| ReservationUpdateRequest.newInstance(rDef, reservationID); |
| ReservationUpdateResponse uResponse = null; |
| try { |
| uResponse = clientService.updateReservation(uRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(sResponse); |
| LOG.info("Update reservation response: " + uResponse); |
| |
| // Delete the reservation |
| ReservationDeleteRequest dRequest = |
| ReservationDeleteRequest.newInstance(reservationID); |
| ReservationDeleteResponse dResponse = null; |
| try { |
| dResponse = clientService.deleteReservation(dRequest); |
| } catch (Exception e) { |
| Assert.fail(e.getMessage()); |
| } |
| Assert.assertNotNull(sResponse); |
| LOG.info("Delete reservation response: " + dResponse); |
| |
| // clean-up |
| rm.stop(); |
| nm = null; |
| rm = null; |
| } |
| |
| private ReservationSubmissionRequest createSimpleReservationRequest( |
| int numContainers, long arrival, long deadline, long duration) { |
| // create a request with a single atomic ask |
| ReservationRequest r = |
| ReservationRequest.newInstance(Resource.newInstance(1024, 1), |
| numContainers, 1, duration); |
| ReservationRequests reqs = |
| ReservationRequests.newInstance(Collections.singletonList(r), |
| ReservationRequestInterpreter.R_ALL); |
| ReservationDefinition rDef = |
| ReservationDefinition.newInstance(arrival, deadline, reqs, |
| "testClientRMService#reservation"); |
| ReservationSubmissionRequest request = |
| ReservationSubmissionRequest.newInstance(rDef, |
| ReservationSystemTestUtil.reservationQ); |
| return request; |
| } |
| |
| @Test |
| public void testGetNodeLabels() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, |
| this.queueACLsManager, this.getRMContext() |
| .getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| rm.start(); |
| NodeLabel labelX = NodeLabel.newInstance("x", false); |
| NodeLabel labelY = NodeLabel.newInstance("y"); |
| RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); |
| labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY)); |
| |
| NodeId node1 = NodeId.newInstance("host1", 1234); |
| NodeId node2 = NodeId.newInstance("host2", 1234); |
| Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); |
| map.put(node1, ImmutableSet.of("x")); |
| map.put(node2, ImmutableSet.of("y")); |
| labelsMgr.replaceLabelsOnNode(map); |
| |
| // Create a client. |
| Configuration conf = new Configuration(); |
| YarnRPC rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| ApplicationClientProtocol client = (ApplicationClientProtocol) rpc |
| .getProxy(ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Get node labels collection |
| GetClusterNodeLabelsResponse response = client |
| .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); |
| Assert.assertTrue(response.getNodeLabels().containsAll( |
| Arrays.asList(labelX, labelY))); |
| |
| // Get node labels mapping |
| GetNodesToLabelsResponse response1 = client |
| .getNodeToLabels(GetNodesToLabelsRequest.newInstance()); |
| Map<NodeId, Set<NodeLabel>> nodeToLabels = response1.getNodeToLabels(); |
| Assert.assertTrue(nodeToLabels.keySet().containsAll( |
| Arrays.asList(node1, node2))); |
| Assert.assertTrue(nodeToLabels.get(node1) |
| .containsAll(Arrays.asList(labelX))); |
| Assert.assertTrue(nodeToLabels.get(node2) |
| .containsAll(Arrays.asList(labelY))); |
| // Verify whether labelX's exclusivity is false |
| for (NodeLabel x : nodeToLabels.get(node1)) { |
| Assert.assertFalse(x.isExclusive()); |
| } |
| // Verify whether labelY's exclusivity is true |
| for (NodeLabel y : nodeToLabels.get(node2)) { |
| Assert.assertTrue(y.isExclusive()); |
| } |
| // Below label "x" is not present in the response as exclusivity is true |
| Assert.assertFalse(nodeToLabels.get(node1).containsAll( |
| Arrays.asList(NodeLabel.newInstance("x")))); |
| |
| rpc.stopProxy(client, conf); |
| rm.close(); |
| } |
| |
| @Test |
| public void testGetLabelsToNodes() throws Exception { |
| MockRM rm = new MockRM() { |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, |
| this.rmAppManager, this.applicationACLsManager, |
| this.queueACLsManager, this.getRMContext() |
| .getRMDelegationTokenSecretManager()); |
| }; |
| }; |
| rm.start(); |
| |
| NodeLabel labelX = NodeLabel.newInstance("x", false); |
| NodeLabel labelY = NodeLabel.newInstance("y", false); |
| NodeLabel labelZ = NodeLabel.newInstance("z", false); |
| RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); |
| labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY, labelZ)); |
| |
| NodeId node1A = NodeId.newInstance("host1", 1234); |
| NodeId node1B = NodeId.newInstance("host1", 5678); |
| NodeId node2A = NodeId.newInstance("host2", 1234); |
| NodeId node3A = NodeId.newInstance("host3", 1234); |
| NodeId node3B = NodeId.newInstance("host3", 5678); |
| Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); |
| map.put(node1A, ImmutableSet.of("x")); |
| map.put(node1B, ImmutableSet.of("z")); |
| map.put(node2A, ImmutableSet.of("y")); |
| map.put(node3A, ImmutableSet.of("y")); |
| map.put(node3B, ImmutableSet.of("z")); |
| labelsMgr.replaceLabelsOnNode(map); |
| |
| // Create a client. |
| Configuration conf = new Configuration(); |
| YarnRPC rpc = YarnRPC.create(conf); |
| InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); |
| LOG.info("Connecting to ResourceManager at " + rmAddress); |
| ApplicationClientProtocol client = (ApplicationClientProtocol) rpc |
| .getProxy(ApplicationClientProtocol.class, rmAddress, conf); |
| |
| // Get node labels collection |
| GetClusterNodeLabelsResponse response = client |
| .getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); |
| Assert.assertTrue(response.getNodeLabels().containsAll( |
| Arrays.asList(labelX, labelY, labelZ))); |
| |
| // Get labels to nodes mapping |
| GetLabelsToNodesResponse response1 = client |
| .getLabelsToNodes(GetLabelsToNodesRequest.newInstance()); |
| Map<NodeLabel, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes(); |
| // Verify whether all NodeLabel's exclusivity are false |
| for (Map.Entry<NodeLabel, Set<NodeId>> nltn : labelsToNodes.entrySet()) { |
| Assert.assertFalse(nltn.getKey().isExclusive()); |
| } |
| Assert.assertTrue(labelsToNodes.keySet().containsAll( |
| Arrays.asList(labelX, labelY, labelZ))); |
| Assert.assertTrue(labelsToNodes.get(labelX).containsAll( |
| Arrays.asList(node1A))); |
| Assert.assertTrue(labelsToNodes.get(labelY).containsAll( |
| Arrays.asList(node2A, node3A))); |
| Assert.assertTrue(labelsToNodes.get(labelZ).containsAll( |
| Arrays.asList(node1B, node3B))); |
| |
| // Get labels to nodes mapping for specific labels |
| Set<String> setlabels = new HashSet<String>(Arrays.asList(new String[]{"x", |
| "z"})); |
| GetLabelsToNodesResponse response2 = client |
| .getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels)); |
| labelsToNodes = response2.getLabelsToNodes(); |
| // Verify whether all NodeLabel's exclusivity are false |
| for (Map.Entry<NodeLabel, Set<NodeId>> nltn : labelsToNodes.entrySet()) { |
| Assert.assertFalse(nltn.getKey().isExclusive()); |
| } |
| Assert.assertTrue(labelsToNodes.keySet().containsAll( |
| Arrays.asList(labelX, labelZ))); |
| Assert.assertTrue(labelsToNodes.get(labelX).containsAll( |
| Arrays.asList(node1A))); |
| Assert.assertTrue(labelsToNodes.get(labelZ).containsAll( |
| Arrays.asList(node1B, node3B))); |
| Assert.assertEquals(labelsToNodes.get(labelY), null); |
| |
| rpc.stopProxy(client, conf); |
| rm.close(); |
| } |
| } |