| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.router.clientrm; |
| |
| import static org.mockito.Mockito.mock; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.HashMap; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.Arrays; |
| |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.test.LambdaTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.yarn.MockApps; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; |
| import org.apache.hadoop.yarn.api.records.SignalContainerCommand; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeKey; |
| import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; |
| import org.apache.hadoop.yarn.api.records.NodeAttribute; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeType; |
| import org.apache.hadoop.yarn.api.records.ReservationRequest; |
| import org.apache.hadoop.yarn.api.records.ReservationDefinition; |
| import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; |
| import org.apache.hadoop.yarn.api.records.ReservationRequests; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; |
| import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; |
| import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; |
| import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockNM; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.util.Times; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.Assert; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to |
| * use the {@code RouterClientRMService} pipeline test cases for testing the |
| * {@code FederationInterceptor} class. The tests for |
| * {@code RouterClientRMService} has been written cleverly so that it can be |
| * reused to validate different request interceptor chains. |
| */ |
| public class TestFederationClientInterceptor extends BaseRouterClientRMTest { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestFederationClientInterceptor.class); |
| |
| private TestableFederationClientInterceptor interceptor; |
| private MemoryFederationStateStore stateStore; |
| private FederationStateStoreTestUtil stateStoreUtil; |
| private List<SubClusterId> subClusters; |
| |
| private String user = "test-user"; |
| |
| private final static int NUM_SUBCLUSTER = 4; |
| |
| private final static int APP_PRIORITY_ZERO = 0; |
| |
| private final static long DEFAULT_DURATION = 10 * 60 * 1000; |
| |
| @Override |
| public void setUp() { |
| super.setUpConfig(); |
| interceptor = new TestableFederationClientInterceptor(); |
| |
| stateStore = new MemoryFederationStateStore(); |
| stateStore.init(this.getConf()); |
| FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); |
| stateStoreUtil = new FederationStateStoreTestUtil(stateStore); |
| |
| interceptor.setConf(this.getConf()); |
| interceptor.init(user); |
| |
| subClusters = new ArrayList<>(); |
| |
| try { |
| for (int i = 0; i < NUM_SUBCLUSTER; i++) { |
| SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); |
| stateStoreUtil.registerSubCluster(sc); |
| subClusters.add(sc); |
| } |
| } catch (YarnException e) { |
| LOG.error(e.getMessage()); |
| Assert.fail(); |
| } |
| |
| DefaultMetricsSystem.setMiniClusterMode(true); |
| } |
| |
| @Override |
| public void tearDown() { |
| interceptor.shutdown(); |
| super.tearDown(); |
| } |
| |
| @Override |
| protected YarnConfiguration createConfiguration() { |
| YarnConfiguration conf = new YarnConfiguration(); |
| conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); |
| String mockPassThroughInterceptorClass = |
| PassThroughClientRequestInterceptor.class.getName(); |
| |
| // Create a request interceptor pipeline for testing. The last one in the |
| // chain is the federation interceptor that calls the mock resource manager. |
| // The others in the chain will simply forward it to the next one in the |
| // chain |
| conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, |
| mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass |
| + "," + TestableFederationClientInterceptor.class.getName()); |
| |
| conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, |
| UniformBroadcastPolicyManager.class.getName()); |
| |
| // Disable StateStoreFacade cache |
| conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); |
| |
| conf.setInt("yarn.scheduler.minimum-allocation-mb", 512); |
| conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); |
| conf.setInt("yarn.scheduler.maximum-allocation-mb", 100 * 1024); |
| conf.setInt("yarn.scheduler.maximum-allocation-vcores", 100); |
| |
| return conf; |
| } |
| |
| /** |
| * This test validates the correctness of GetNewApplication. The return |
| * ApplicationId has to belong to one of the SubCluster in the cluster. |
| */ |
| @Test |
| public void testGetNewApplication() throws YarnException, IOException { |
| LOG.info("Test FederationClientInterceptor: Get New Application."); |
| |
| GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); |
| GetNewApplicationResponse response = interceptor.getNewApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(response.getApplicationId()); |
| Assert.assertEquals(response.getApplicationId().getClusterTimestamp(), |
| ResourceManager.getClusterTimeStamp()); |
| } |
| |
| /** |
| * This test validates the correctness of SubmitApplication. The application |
| * has to be submitted to one of the SubCluster in the cluster. |
| */ |
| @Test |
| public void testSubmitApplication() |
| throws YarnException, IOException { |
| LOG.info("Test FederationClientInterceptor: Submit Application."); |
| |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId); |
| Assert.assertNotNull(scIdResult); |
| Assert.assertTrue(subClusters.contains(scIdResult)); |
| } |
| |
| private SubmitApplicationRequest mockSubmitApplicationRequest( |
| ApplicationId appId) { |
| ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); |
| ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance( |
| appId, MockApps.newAppName(), "default", |
| Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1, |
| Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), |
| "MockApp"); |
| SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context); |
| return request; |
| } |
| |
| /** |
| * This test validates the correctness of SubmitApplication in case of |
| * multiple submission. The first retry has to be submitted to the same |
| * SubCluster of the first attempt. |
| */ |
| @Test |
| public void testSubmitApplicationMultipleSubmission() |
| throws YarnException, IOException, InterruptedException { |
| LOG.info( |
| "Test FederationClientInterceptor: Submit Application - Multiple"); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // First attempt |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId); |
| Assert.assertNotNull(scIdResult); |
| |
| // First retry |
| response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId); |
| Assert.assertNotNull(scIdResult2); |
| Assert.assertEquals(scIdResult, scIdResult); |
| } |
| |
| /** |
| * This test validates the correctness of SubmitApplication in case of empty |
| * request. |
| */ |
| @Test |
| public void testSubmitApplicationEmptyRequest() |
| throws Exception { |
| LOG.info("Test FederationClientInterceptor: Submit Application - Empty."); |
| |
| // null request1 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing submitApplication request or applicationSubmissionContext information.", |
| () -> interceptor.submitApplication(null)); |
| |
| // null request2 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing submitApplication request or applicationSubmissionContext information.", |
| () -> interceptor.submitApplication(SubmitApplicationRequest.newInstance(null))); |
| |
| // null request3 |
| ApplicationSubmissionContext context = ApplicationSubmissionContext |
| .newInstance(null, "", "", null, null, false, false, -1, null, null); |
| SubmitApplicationRequest request = |
| SubmitApplicationRequest.newInstance(context); |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing submitApplication request or applicationSubmissionContext information.", |
| () -> interceptor.submitApplication(request)); |
| } |
| |
| /** |
| * This test validates the correctness of ForceKillApplication in case the |
| * application exists in the cluster. |
| */ |
| @Test |
| public void testForceKillApplication() |
| throws YarnException, IOException, InterruptedException { |
| LOG.info("Test FederationClientInterceptor: Force Kill Application."); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application we are going to kill later |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId); |
| KillApplicationResponse responseKill = interceptor.forceKillApplication(requestKill); |
| Assert.assertNotNull(responseKill); |
| } |
| |
| /** |
| * This test validates the correctness of ForceKillApplication in case of |
| * application does not exist in StateStore. |
| */ |
| @Test |
| public void testForceKillApplicationNotExists() throws Exception { |
| LOG.info("Test FederationClientInterceptor: Force Kill Application - Not Exists"); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| KillApplicationRequest requestKill = |
| KillApplicationRequest.newInstance(appId); |
| |
| LambdaTestUtils.intercept(YarnException.class, |
| "Application " + appId + " does not exist in FederationStateStore.", |
| () -> interceptor.forceKillApplication(requestKill)); |
| } |
| |
| /** |
| * This test validates the correctness of ForceKillApplication in case of |
| * empty request. |
| */ |
| @Test |
| public void testForceKillApplicationEmptyRequest() |
| throws Exception { |
| LOG.info("Test FederationClientInterceptor: Force Kill Application - Empty."); |
| |
| // null request1 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing forceKillApplication request or ApplicationId.", |
| () -> interceptor.forceKillApplication(null)); |
| |
| // null request2 |
| KillApplicationRequest killRequest = KillApplicationRequest.newInstance(null); |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing forceKillApplication request or ApplicationId.", |
| () -> interceptor.forceKillApplication(killRequest)); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationReport in case the |
| * application exists in the cluster. |
| */ |
| @Test |
| public void testGetApplicationReport() |
| throws YarnException, IOException, InterruptedException { |
| LOG.info("Test FederationClientInterceptor: Get Application Report"); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application we want the report later |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| GetApplicationReportRequest requestGet = |
| GetApplicationReportRequest.newInstance(appId); |
| |
| GetApplicationReportResponse responseGet = |
| interceptor.getApplicationReport(requestGet); |
| |
| Assert.assertNotNull(responseGet); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationReport in case the |
| * application does not exist in StateStore. |
| */ |
| @Test |
| public void testGetApplicationNotExists() |
| throws Exception { |
| LOG.info("Test ApplicationClientProtocol: Get Application Report - Not Exists."); |
| |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId); |
| LambdaTestUtils.intercept(YarnException.class, |
| "Application " + appId + " does not exist in FederationStateStore.", |
| () -> interceptor.getApplicationReport(requestGet)); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationReport in case of |
| * empty request. |
| */ |
| @Test |
| public void testGetApplicationEmptyRequest() |
| throws Exception { |
| LOG.info("Test FederationClientInterceptor: Get Application Report - Empty."); |
| |
| // null request1 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getApplicationReport request or applicationId information.", |
| () -> interceptor.getApplicationReport(null)); |
| |
| // null request2 |
| GetApplicationReportRequest reportRequest = GetApplicationReportRequest.newInstance(null); |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getApplicationReport request or applicationId information.", |
| () -> interceptor.getApplicationReport(reportRequest)); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationAttemptReport in case the |
| * application exists in the cluster. |
| */ |
| @Test |
| public void testGetApplicationAttemptReport() |
| throws YarnException, IOException, InterruptedException { |
| LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report."); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application we want the applicationAttempt report later |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| // Call GetApplicationAttempts Get ApplicationAttemptId |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| |
| // Wait for app to start |
| while(attemptsResponse.getApplicationAttemptList().size() == 0) { |
| attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| } |
| |
| Assert.assertNotNull(attemptsResponse); |
| |
| GetApplicationAttemptReportRequest requestGet = |
| GetApplicationAttemptReportRequest.newInstance( |
| attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId()); |
| |
| GetApplicationAttemptReportResponse responseGet = |
| interceptor.getApplicationAttemptReport(requestGet); |
| |
| Assert.assertNotNull(responseGet); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationAttemptReport in case the |
| * application does not exist in StateStore. |
| */ |
| @Test |
| public void testGetApplicationAttemptNotExists() throws Exception { |
| LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Not Exists."); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| ApplicationAttemptId appAttemptID = |
| ApplicationAttemptId.newInstance(appId, 1); |
| GetApplicationAttemptReportRequest requestGet = |
| GetApplicationAttemptReportRequest.newInstance(appAttemptID); |
| |
| LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " + |
| appAttemptID + " belongs to Application " + |
| appId + " does not exist in FederationStateStore.", |
| () -> interceptor.getApplicationAttemptReport(requestGet)); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationAttemptReport in case of |
| * empty request. |
| */ |
| @Test |
| public void testGetApplicationAttemptEmptyRequest() |
| throws Exception { |
| LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty."); |
| |
| // null request1 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getApplicationAttemptReport request or applicationId " + |
| "or applicationAttemptId information.", |
| () -> interceptor.getApplicationAttemptReport(null)); |
| |
| // null request2 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getApplicationAttemptReport request or applicationId " + |
| "or applicationAttemptId information.", |
| () -> interceptor.getApplicationAttemptReport( |
| GetApplicationAttemptReportRequest.newInstance(null))); |
| |
| // null request3 |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getApplicationAttemptReport request or applicationId " + |
| "or applicationAttemptId information.", |
| () -> interceptor.getApplicationAttemptReport( |
| GetApplicationAttemptReportRequest.newInstance( |
| ApplicationAttemptId.newInstance(null, 1)))); |
| } |
| |
| |
| @Test |
| public void testGetClusterMetricsRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getClusterMetrics request.", |
| () -> interceptor.getClusterMetrics(null)); |
| |
| // normal request. |
| GetClusterMetricsResponse response = |
| interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance()); |
| Assert.assertEquals(subClusters.size(), |
| response.getClusterMetrics().getNumNodeManagers()); |
| |
| ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", |
| new Class[] {GetClusterMetricsRequest.class}, |
| new Object[] {GetClusterMetricsRequest.newInstance()}); |
| Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor. |
| invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class); |
| Assert.assertTrue(clusterMetrics.isEmpty()); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationsResponse in case the |
| * application exists in the cluster. |
| */ |
| @Test |
| public void testGetApplicationsResponse() |
| throws YarnException, IOException, InterruptedException { |
| LOG.info("Test FederationClientInterceptor: Get Applications Response."); |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| Set<String> appTypes = Collections.singleton("MockApp"); |
| GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes); |
| GetApplicationsResponse responseGet = interceptor.getApplications(requestGet); |
| |
| Assert.assertNotNull(responseGet); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationsResponse in case of |
| * empty request. |
| */ |
| @Test |
| public void testGetApplicationsNullRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor: Get Applications request."); |
| LambdaTestUtils.intercept(YarnException.class, "Missing getApplications request.", |
| () -> interceptor.getApplications(null)); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationsResponse in case applications |
| * with given type does not exist. |
| */ |
| @Test |
| public void testGetApplicationsApplicationTypeNotExists() throws Exception{ |
| LOG.info("Test FederationClientInterceptor: Application with type does not exist."); |
| |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| Set<String> appTypes = Collections.singleton("SPARK"); |
| |
| GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes); |
| GetApplicationsResponse responseGet = interceptor.getApplications(requestGet); |
| |
| Assert.assertNotNull(responseGet); |
| Assert.assertTrue(responseGet.getApplicationList().isEmpty()); |
| } |
| |
| /** |
| * This test validates the correctness of GetApplicationsResponse in case applications |
| * with given YarnApplicationState does not exist. |
| */ |
| @Test |
| public void testGetApplicationsApplicationStateNotExists() throws Exception { |
| LOG.info("Test FederationClientInterceptor: Application with state does not exist."); |
| |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| EnumSet<YarnApplicationState> applicationStates = EnumSet.noneOf( |
| YarnApplicationState.class); |
| applicationStates.add(YarnApplicationState.KILLED); |
| |
| GetApplicationsRequest requestGet = |
| GetApplicationsRequest.newInstance(applicationStates); |
| |
| GetApplicationsResponse responseGet = interceptor.getApplications(requestGet); |
| |
| Assert.assertNotNull(responseGet); |
| Assert.assertTrue(responseGet.getApplicationList().isEmpty()); |
| } |
| |
| @Test |
| public void testGetClusterNodesRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Cluster Nodes request."); |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.", |
| () -> interceptor.getClusterNodes(null)); |
| // normal request. |
| GetClusterNodesResponse response = |
| interceptor.getClusterNodes(GetClusterNodesRequest.newInstance()); |
| Assert.assertEquals(subClusters.size(), response.getNodeReports().size()); |
| } |
| |
| @Test |
| public void testGetNodeToLabelsRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Node To Labels request."); |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.", |
| () -> interceptor.getNodeToLabels(null)); |
| // normal request. |
| GetNodesToLabelsResponse response = |
| interceptor.getNodeToLabels(GetNodesToLabelsRequest.newInstance()); |
| Assert.assertEquals(0, response.getNodeToLabels().size()); |
| } |
| |
| @Test |
| public void testGetLabelsToNodesRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Labels To Node request."); |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.", |
| () -> interceptor.getLabelsToNodes(null)); |
| // normal request. |
| GetLabelsToNodesResponse response = |
| interceptor.getLabelsToNodes(GetLabelsToNodesRequest.newInstance()); |
| Assert.assertEquals(0, response.getLabelsToNodes().size()); |
| } |
| |
| @Test |
| public void testClusterNodeLabelsRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request."); |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.", |
| () -> interceptor.getClusterNodeLabels(null)); |
| // normal request. |
| GetClusterNodeLabelsResponse response = |
| interceptor.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance()); |
| Assert.assertEquals(0, response.getNodeLabelList().size()); |
| } |
| |
| @Test |
| public void testGetQueueUserAcls() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.", |
| () -> interceptor.getQueueUserAcls(null)); |
| |
| // normal request |
| GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls( |
| GetQueueUserAclsInfoRequest.newInstance()); |
| |
| Assert.assertNotNull(response); |
| |
| List<QueueACL> submitAndAdministerAcl = new ArrayList<>(); |
| submitAndAdministerAcl.add(QueueACL.SUBMIT_APPLICATIONS); |
| submitAndAdministerAcl.add(QueueACL.ADMINISTER_QUEUE); |
| |
| QueueUserACLInfo exceptRootQueueACLInfo = QueueUserACLInfo.newInstance("root", |
| submitAndAdministerAcl); |
| |
| QueueUserACLInfo queueRootQueueACLInfo = response.getUserAclsInfoList().stream(). |
| filter(acl->acl.getQueueName().equals("root")). |
| collect(Collectors.toList()).get(0); |
| |
| Assert.assertEquals(exceptRootQueueACLInfo, queueRootQueueACLInfo); |
| } |
| |
| @Test |
| public void testListReservations() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get ListReservations request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.", |
| () -> interceptor.listReservations(null)); |
| |
| // normal request |
| ReservationId reservationId = ReservationId.newInstance(1653487680L, 1L); |
| ReservationListResponse response = interceptor.listReservations( |
| ReservationListRequest.newInstance("root.decided", reservationId.toString())); |
| Assert.assertNotNull(response); |
| Assert.assertEquals(0, response.getReservationAllocationState().size()); |
| } |
| |
| @Test |
| public void testGetContainersRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Containers request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " + |
| "or ApplicationAttemptId.", () -> interceptor.getContainers(null)); |
| |
| // normal request |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| // Call GetApplicationAttempts |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| |
| // Wait for app to start |
| while(attemptsResponse.getApplicationAttemptList().size() == 0) { |
| attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| } |
| |
| Assert.assertNotNull(attemptsResponse); |
| |
| // Call GetContainers |
| GetContainersRequest containersRequest = |
| GetContainersRequest.newInstance( |
| attemptsResponse.getApplicationAttemptList().get(0).getApplicationAttemptId()); |
| GetContainersResponse containersResponse = |
| interceptor.getContainers(containersRequest); |
| |
| Assert.assertNotNull(containersResponse); |
| } |
| |
| @Test |
| public void testGetContainerReportRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Container Report request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getContainerReport request " + |
| "or containerId", () -> interceptor.getContainerReport(null)); |
| |
| // normal request |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| // Call GetApplicationAttempts |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| |
| // Wait for app to start |
| while(attemptsResponse.getApplicationAttemptList().size() == 0) { |
| attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| } |
| Assert.assertNotNull(attemptsResponse); |
| |
| ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList(). |
| get(0).getApplicationAttemptId(); |
| ContainerId containerId = ContainerId.newContainerId(attemptId, 1); |
| |
| // Call ContainerReport, RM does not allocate Container, Here is null |
| GetContainerReportRequest containerReportRequest = |
| GetContainerReportRequest.newInstance(containerId); |
| GetContainerReportResponse containerReportResponse = |
| interceptor.getContainerReport(containerReportRequest); |
| |
| Assert.assertEquals(containerReportResponse, null); |
| } |
| |
| @Test |
| public void getApplicationAttempts() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Application Attempts request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getApplicationAttempts " + |
| "request or application id.", () -> interceptor.getApplicationAttempts(null)); |
| |
| // normal request |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| // Call GetApplicationAttempts |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| |
| Assert.assertNotNull(attemptsResponse); |
| } |
| |
| @Test |
| public void testGetResourceTypeInfoRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request."); |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.", |
| () -> interceptor.getResourceTypeInfo(null)); |
| // normal request. |
| GetAllResourceTypeInfoResponse response = |
| interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance()); |
| Assert.assertEquals(2, response.getResourceTypeInfo().size()); |
| } |
| |
| @Test |
| public void testFailApplicationAttempt() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Fail Application Attempt request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " + |
| "or applicationId or applicationAttemptId information.", |
| () -> interceptor.failApplicationAttempt(null)); |
| |
| // normal request |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); |
| Assert.assertNotNull(subClusterId); |
| |
| MockRM mockRM = interceptor.getMockRMs().get(subClusterId); |
| mockRM.waitForState(appId, RMAppState.ACCEPTED); |
| RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); |
| mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), |
| RMAppAttemptState.SCHEDULED); |
| |
| // Call GetApplicationAttempts |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| Assert.assertNotNull(attemptsResponse); |
| |
| ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList(). |
| get(0).getApplicationAttemptId(); |
| |
| FailApplicationAttemptRequest requestFailAppAttempt = |
| FailApplicationAttemptRequest.newInstance(attemptId); |
| FailApplicationAttemptResponse responseFailAppAttempt = |
| interceptor.failApplicationAttempt(requestFailAppAttempt); |
| |
| Assert.assertNotNull(responseFailAppAttempt); |
| } |
| |
| @Test |
| public void testUpdateApplicationPriority() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Update Application Priority request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " + |
| "or applicationId or applicationPriority information.", |
| () -> interceptor.updateApplicationPriority(null)); |
| |
| // normal request |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); |
| Assert.assertNotNull(subClusterId); |
| |
| MockRM mockRM = interceptor.getMockRMs().get(subClusterId); |
| mockRM.waitForState(appId, RMAppState.ACCEPTED); |
| RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); |
| mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), |
| RMAppAttemptState.SCHEDULED); |
| |
| // Call GetApplicationAttempts |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| Assert.assertNotNull(attemptsResponse); |
| |
| Priority priority = Priority.newInstance(20); |
| UpdateApplicationPriorityRequest requestUpdateAppPriority = |
| UpdateApplicationPriorityRequest.newInstance(appId, priority); |
| UpdateApplicationPriorityResponse responseAppPriority = |
| interceptor.updateApplicationPriority(requestUpdateAppPriority); |
| |
| Assert.assertNotNull(responseAppPriority); |
| Assert.assertEquals(20, |
| responseAppPriority.getApplicationPriority().getPriority()); |
| } |
| |
| @Test |
| public void testUpdateApplicationTimeouts() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Update Application Timeouts request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " + |
| "or applicationId or applicationTimeouts information.", |
| () -> interceptor.updateApplicationTimeouts(null)); |
| |
| // normal request |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); |
| Assert.assertNotNull(subClusterId); |
| |
| MockRM mockRM = interceptor.getMockRMs().get(subClusterId); |
| mockRM.waitForState(appId, RMAppState.ACCEPTED); |
| RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); |
| mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), |
| RMAppAttemptState.SCHEDULED); |
| |
| // Call GetApplicationAttempts |
| GetApplicationAttemptsRequest attemptsRequest = |
| GetApplicationAttemptsRequest.newInstance(appId); |
| GetApplicationAttemptsResponse attemptsResponse = |
| interceptor.getApplicationAttempts(attemptsRequest); |
| Assert.assertNotNull(attemptsResponse); |
| |
| String appTimeout = |
| Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); |
| Map<ApplicationTimeoutType, String> applicationTimeouts = new HashMap<>(); |
| applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout); |
| |
| UpdateApplicationTimeoutsRequest timeoutsRequest = |
| UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts); |
| UpdateApplicationTimeoutsResponse timeoutsResponse = |
| interceptor.updateApplicationTimeouts(timeoutsRequest); |
| |
| String responseTimeOut = |
| timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); |
| Assert.assertNotNull(timeoutsResponse); |
| Assert.assertEquals(appTimeout, responseTimeOut); |
| } |
| |
| @Test |
| public void testSignalContainer() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Signal Container request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer request " + |
| "or containerId or command information.", () -> interceptor.signalToContainer(null)); |
| |
| // normal request |
| ApplicationId appId = |
| ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); |
| Assert.assertNotNull(subClusterId); |
| |
| MockRM mockRM = interceptor.getMockRMs().get(subClusterId); |
| mockRM.waitForState(appId, RMAppState.ACCEPTED); |
| RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); |
| mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), |
| RMAppAttemptState.SCHEDULED); |
| MockNM nm = interceptor.getMockNMs().get(subClusterId); |
| nm.nodeHeartbeat(true); |
| MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); |
| mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); |
| |
| ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId(); |
| |
| SignalContainerRequest signalContainerRequest = |
| SignalContainerRequest.newInstance(containerId, SignalContainerCommand.GRACEFUL_SHUTDOWN); |
| SignalContainerResponse signalContainerResponse = |
| interceptor.signalToContainer(signalContainerRequest); |
| |
| Assert.assertNotNull(signalContainerResponse); |
| } |
| |
| @Test |
| public void testMoveApplicationAcrossQueues() throws Exception { |
| LOG.info("Test FederationClientInterceptor : MoveApplication AcrossQueues request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing moveApplicationAcrossQueues request " + |
| "or applicationId or target queue.", () -> interceptor.moveApplicationAcrossQueues(null)); |
| |
| // normal request |
| ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); |
| SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); |
| |
| // Submit the application |
| SubmitApplicationResponse response = interceptor.submitApplication(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); |
| |
| SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); |
| Assert.assertNotNull(subClusterId); |
| |
| MockRM mockRM = interceptor.getMockRMs().get(subClusterId); |
| mockRM.waitForState(appId, RMAppState.ACCEPTED); |
| RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); |
| mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), |
| RMAppAttemptState.SCHEDULED); |
| MockNM nm = interceptor.getMockNMs().get(subClusterId); |
| nm.nodeHeartbeat(true); |
| MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); |
| mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); |
| |
| MoveApplicationAcrossQueuesRequest acrossQueuesRequest = |
| MoveApplicationAcrossQueuesRequest.newInstance(appId, "root.target"); |
| MoveApplicationAcrossQueuesResponse acrossQueuesResponse = |
| interceptor.moveApplicationAcrossQueues(acrossQueuesRequest); |
| |
| Assert.assertNotNull(acrossQueuesResponse); |
| } |
| |
| |
| @Test |
| public void testGetQueueInfo() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Queue Info request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getQueueInfo request or queueName.", |
| () -> interceptor.getQueueInfo(null)); |
| |
| // normal request |
| GetQueueInfoResponse response = interceptor.getQueueInfo( |
| GetQueueInfoRequest.newInstance("root", true, true, true)); |
| |
| Assert.assertNotNull(response); |
| |
| QueueInfo queueInfo = response.getQueueInfo(); |
| Assert.assertNotNull(queueInfo); |
| Assert.assertEquals(queueInfo.getQueueName(), "root"); |
| Assert.assertEquals(queueInfo.getCapacity(), 4.0, 0); |
| Assert.assertEquals(queueInfo.getCurrentCapacity(), 0.0, 0); |
| Assert.assertEquals(queueInfo.getChildQueues().size(), 12, 0); |
| Assert.assertEquals(queueInfo.getAccessibleNodeLabels().size(), 1); |
| } |
| |
| @Test |
| public void testGetResourceProfiles() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Resource Profiles request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getResourceProfiles request.", |
| () -> interceptor.getResourceProfiles(null)); |
| |
| // normal request |
| GetAllResourceProfilesRequest request = GetAllResourceProfilesRequest.newInstance(); |
| GetAllResourceProfilesResponse response = interceptor.getResourceProfiles(request); |
| |
| Assert.assertNotNull(response); |
| Map<String, Resource> resProfiles = response.getResourceProfiles(); |
| |
| Resource maxResProfiles = resProfiles.get("maximum"); |
| Assert.assertEquals(32768, maxResProfiles.getMemorySize()); |
| Assert.assertEquals(16, maxResProfiles.getVirtualCores()); |
| |
| Resource defaultResProfiles = resProfiles.get("default"); |
| Assert.assertEquals(8192, defaultResProfiles.getMemorySize()); |
| Assert.assertEquals(8, defaultResProfiles.getVirtualCores()); |
| |
| Resource minimumResProfiles = resProfiles.get("minimum"); |
| Assert.assertEquals(4096, minimumResProfiles.getMemorySize()); |
| Assert.assertEquals(4, minimumResProfiles.getVirtualCores()); |
| } |
| |
| @Test |
| public void testGetResourceProfile() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get Resource Profile request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getResourceProfile request or profileName.", |
| () -> interceptor.getResourceProfile(null)); |
| |
| // normal request |
| GetResourceProfileRequest request = GetResourceProfileRequest.newInstance("maximum"); |
| GetResourceProfileResponse response = interceptor.getResourceProfile(request); |
| |
| Assert.assertNotNull(response); |
| Assert.assertEquals(32768, response.getResource().getMemorySize()); |
| Assert.assertEquals(16, response.getResource().getVirtualCores()); |
| |
| GetResourceProfileRequest request2 = GetResourceProfileRequest.newInstance("default"); |
| GetResourceProfileResponse response2 = interceptor.getResourceProfile(request2); |
| |
| Assert.assertNotNull(response2); |
| Assert.assertEquals(8192, response2.getResource().getMemorySize()); |
| Assert.assertEquals(8, response2.getResource().getVirtualCores()); |
| |
| GetResourceProfileRequest request3 = GetResourceProfileRequest.newInstance("minimum"); |
| GetResourceProfileResponse response3 = interceptor.getResourceProfile(request3); |
| |
| Assert.assertNotNull(response3); |
| Assert.assertEquals(4096, response3.getResource().getMemorySize()); |
| Assert.assertEquals(4, response3.getResource().getVirtualCores()); |
| } |
| |
| @Test |
| public void testGetAttributesToNodes() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " + |
| "or nodeAttributes.", () -> interceptor.getAttributesToNodes(null)); |
| |
| // normal request |
| GetAttributesToNodesResponse response = |
| interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance()); |
| |
| Assert.assertNotNull(response); |
| Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes(); |
| Assert.assertNotNull(attrs); |
| Assert.assertEquals(4, attrs.size()); |
| |
| NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", |
| NodeAttributeType.STRING, "nvidia"); |
| NodeToAttributeValue attributeValue1 = |
| NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue()); |
| NodeAttributeKey gpuKey = gpu.getAttributeKey(); |
| Assert.assertTrue(attrs.get(gpuKey).contains(attributeValue1)); |
| } |
| |
| @Test |
| public void testClusterNodeAttributes() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.", |
| () -> interceptor.getClusterNodeAttributes(null)); |
| |
| // normal request |
| GetClusterNodeAttributesResponse response = |
| interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance()); |
| |
| Assert.assertNotNull(response); |
| Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes(); |
| Assert.assertNotNull(nodeAttributeInfos); |
| Assert.assertEquals(4, nodeAttributeInfos.size()); |
| |
| NodeAttributeInfo nodeAttributeInfo1 = |
| NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), |
| NodeAttributeType.STRING); |
| Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1)); |
| |
| NodeAttributeInfo nodeAttributeInfo2 = |
| NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"), |
| NodeAttributeType.STRING); |
| Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2)); |
| } |
| |
| @Test |
| public void testNodesToAttributes() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getNodesToAttributes request or hostNames.", |
| () -> interceptor.getNodesToAttributes(null)); |
| |
| // normal request |
| Set<String> hostNames = Collections.singleton("0-host1"); |
| GetNodesToAttributesResponse response = |
| interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames)); |
| Assert.assertNotNull(response); |
| |
| Map<String, Set<NodeAttribute>> nodeAttributeMap = response.getNodeToAttributes(); |
| Assert.assertNotNull(nodeAttributeMap); |
| Assert.assertEquals(1, nodeAttributeMap.size()); |
| |
| NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", |
| NodeAttributeType.STRING, "nvida"); |
| Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu)); |
| } |
| |
| @Test |
| public void testGetNewReservation() throws Exception { |
| LOG.info("Test FederationClientInterceptor : Get NewReservation request."); |
| |
| // null request |
| LambdaTestUtils.intercept(YarnException.class, |
| "Missing getNewReservation request.", () -> interceptor.getNewReservation(null)); |
| |
| // normal request |
| GetNewReservationRequest request = GetNewReservationRequest.newInstance(); |
| GetNewReservationResponse response = interceptor.getNewReservation(request); |
| Assert.assertNotNull(response); |
| |
| ReservationId reservationId = response.getReservationId(); |
| Assert.assertNotNull(reservationId); |
| Assert.assertTrue(reservationId.toString().contains("reservation")); |
| Assert.assertEquals(reservationId.getClusterTimestamp(), ResourceManager.getClusterTimeStamp()); |
| } |
| |
| @Test |
| public void testSubmitReservation() throws Exception { |
| LOG.info("Test FederationClientInterceptor : SubmitReservation request."); |
| |
| // get new reservationId |
| GetNewReservationRequest request = GetNewReservationRequest.newInstance(); |
| GetNewReservationResponse response = interceptor.getNewReservation(request); |
| Assert.assertNotNull(response); |
| |
| // Submit Reservation |
| ReservationId reservationId = response.getReservationId(); |
| ReservationDefinition rDefinition = createReservationDefinition(1024, 1); |
| ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( |
| rDefinition, "decided", reservationId); |
| |
| ReservationSubmissionResponse submissionResponse = |
| interceptor.submitReservation(rSubmissionRequest); |
| Assert.assertNotNull(submissionResponse); |
| |
| SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); |
| Assert.assertNotNull(subClusterId); |
| Assert.assertTrue(subClusters.contains(subClusterId)); |
| } |
| |
| @Test |
| public void testSubmitReservationEmptyRequest() throws Exception { |
| LOG.info("Test FederationClientInterceptor : SubmitReservation request empty."); |
| |
| String errorMsg = |
| "Missing submitReservation request or reservationId or reservation definition or queue."; |
| |
| // null request1 |
| LambdaTestUtils.intercept(YarnException.class, errorMsg, |
| () -> interceptor.submitReservation(null)); |
| |
| // null request2 |
| ReservationSubmissionRequest request2 = |
| ReservationSubmissionRequest.newInstance(null, null, null); |
| LambdaTestUtils.intercept(YarnException.class, errorMsg, |
| () -> interceptor.submitReservation(request2)); |
| |
| // null request3 |
| ReservationSubmissionRequest request3 = |
| ReservationSubmissionRequest.newInstance(null, "q1", null); |
| LambdaTestUtils.intercept(YarnException.class, errorMsg, |
| () -> interceptor.submitReservation(request3)); |
| |
| // null request4 |
| ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); |
| ReservationSubmissionRequest request4 = |
| ReservationSubmissionRequest.newInstance(null, null, reservationId); |
| LambdaTestUtils.intercept(YarnException.class, errorMsg, |
| () -> interceptor.submitReservation(request4)); |
| |
| // null request5 |
| long arrival = Time.now(); |
| long deadline = arrival + (int)(DEFAULT_DURATION * 1.1); |
| |
| ReservationRequest rRequest = ReservationRequest.newInstance( |
| Resource.newInstance(1024, 1), 1, 1, DEFAULT_DURATION); |
| ReservationRequest[] rRequests = new ReservationRequest[] {rRequest}; |
| ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests, |
| ReservationRequestInterpreter.R_ALL, "u1"); |
| ReservationSubmissionRequest request5 = |
| ReservationSubmissionRequest.newInstance(rDefinition, null, reservationId); |
| LambdaTestUtils.intercept(YarnException.class, errorMsg, |
| () -> interceptor.submitReservation(request5)); |
| } |
| |
| @Test |
| public void testSubmitReservationMultipleSubmission() throws Exception { |
| LOG.info("Test FederationClientInterceptor: Submit Reservation - Multiple"); |
| |
| // get new reservationId |
| GetNewReservationRequest request = GetNewReservationRequest.newInstance(); |
| GetNewReservationResponse response = interceptor.getNewReservation(request); |
| Assert.assertNotNull(response); |
| |
| // First Submit Reservation |
| ReservationId reservationId = response.getReservationId(); |
| ReservationDefinition rDefinition = createReservationDefinition(1024, 1); |
| ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( |
| rDefinition, "decided", reservationId); |
| ReservationSubmissionResponse submissionResponse = |
| interceptor.submitReservation(rSubmissionRequest); |
| Assert.assertNotNull(submissionResponse); |
| |
| SubClusterId subClusterId1 = stateStoreUtil.queryReservationHomeSC(reservationId); |
| Assert.assertNotNull(subClusterId1); |
| Assert.assertTrue(subClusters.contains(subClusterId1)); |
| |
| // First Retry, repeat the submission |
| ReservationSubmissionResponse submissionResponse1 = |
| interceptor.submitReservation(rSubmissionRequest); |
| Assert.assertNotNull(submissionResponse1); |
| |
| // Expect reserved clusters to be consistent |
| SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId); |
| Assert.assertNotNull(subClusterId2); |
| Assert.assertEquals(subClusterId1, subClusterId2); |
| } |
| |
| @Test |
| public void testUpdateReservation() throws Exception { |
| LOG.info("Test FederationClientInterceptor : UpdateReservation request."); |
| |
| // get new reservationId |
| GetNewReservationRequest request = GetNewReservationRequest.newInstance(); |
| GetNewReservationResponse response = interceptor.getNewReservation(request); |
| Assert.assertNotNull(response); |
| |
| // allow plan follower to synchronize, manually trigger an assignment |
| Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); |
| for (MockRM mockRM : mockRMs.values()) { |
| ReservationSystem reservationSystem = mockRM.getReservationSystem(); |
| reservationSystem.synchronizePlan("root.decided", true); |
| } |
| |
| // Submit Reservation |
| ReservationId reservationId = response.getReservationId(); |
| ReservationDefinition rDefinition = createReservationDefinition(1024, 1); |
| ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( |
| rDefinition, "decided", reservationId); |
| |
| ReservationSubmissionResponse submissionResponse = |
| interceptor.submitReservation(rSubmissionRequest); |
| Assert.assertNotNull(submissionResponse); |
| |
| // Update Reservation |
| ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1); |
| ReservationUpdateRequest updateRequest = |
| ReservationUpdateRequest.newInstance(rDefinition2, reservationId); |
| ReservationUpdateResponse updateResponse = |
| interceptor.updateReservation(updateRequest); |
| Assert.assertNotNull(updateResponse); |
| |
| SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId); |
| Assert.assertNotNull(subClusterId); |
| } |
| |
| @Test |
| public void testDeleteReservation() throws Exception { |
| LOG.info("Test FederationClientInterceptor : DeleteReservation request."); |
| |
| // get new reservationId |
| GetNewReservationRequest request = GetNewReservationRequest.newInstance(); |
| GetNewReservationResponse response = interceptor.getNewReservation(request); |
| Assert.assertNotNull(response); |
| |
| // allow plan follower to synchronize, manually trigger an assignment |
| Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs(); |
| for (MockRM mockRM : mockRMs.values()) { |
| ReservationSystem reservationSystem = mockRM.getReservationSystem(); |
| reservationSystem.synchronizePlan("root.decided", true); |
| } |
| |
| // Submit Reservation |
| ReservationId reservationId = response.getReservationId(); |
| ReservationDefinition rDefinition = createReservationDefinition(1024, 1); |
| ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance( |
| rDefinition, "decided", reservationId); |
| |
| ReservationSubmissionResponse submissionResponse = |
| interceptor.submitReservation(rSubmissionRequest); |
| Assert.assertNotNull(submissionResponse); |
| |
| // Delete Reservation |
| ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance(reservationId); |
| ReservationDeleteResponse deleteResponse = interceptor.deleteReservation(deleteRequest); |
| Assert.assertNotNull(deleteResponse); |
| |
| LambdaTestUtils.intercept(YarnException.class, |
| "Reservation " + reservationId + " does not exist", |
| () -> stateStoreUtil.queryReservationHomeSC(reservationId)); |
| } |
| |
| |
| private ReservationDefinition createReservationDefinition(int memory, int core) { |
| // get reservationId |
| long arrival = Time.now(); |
| long deadline = arrival + (int)(DEFAULT_DURATION * 1.1); |
| |
| ReservationRequest rRequest = ReservationRequest.newInstance( |
| Resource.newInstance(memory, core), 1, 1, DEFAULT_DURATION); |
| ReservationRequest[] rRequests = new ReservationRequest[] {rRequest}; |
| |
| ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests, |
| ReservationRequestInterpreter.R_ALL, "u1"); |
| |
| return rDefinition; |
| } |
| |
| /** |
| * This method is used to create a ReservationDefinition. |
| * |
| * @param arrival Job arrival time |
| * @param deadline Job deadline |
| * @param reservationRequests reservationRequest Array |
| * @param rType Enumeration of various types of |
| * dependencies among multiple ReservationRequest |
| * @param username username |
| * @return ReservationDefinition |
| */ |
| private ReservationDefinition createReservationDefinition(long arrival, |
| long deadline, ReservationRequest[] reservationRequests, |
| ReservationRequestInterpreter rType, String username) { |
| ReservationRequests requests = ReservationRequests |
| .newInstance(Arrays.asList(reservationRequests), rType); |
| return ReservationDefinition.newInstance(arrival, deadline, |
| requests, username, "0", Priority.UNDEFINED); |
| } |
| } |