| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.client.api.impl; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.concurrent.TimeoutException; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.SecretManager.InvalidToken; |
| import org.apache.hadoop.service.Service.STATE; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.records.*; |
| import org.apache.hadoop.yarn.client.ClientRMProxy; |
| import org.apache.hadoop.yarn.client.api.AMRMClient; |
| import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; |
| import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException; |
| import org.apache.hadoop.yarn.client.api.NMClient; |
| import org.apache.hadoop.yarn.client.api.NMTokenCache; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.server.MiniYARNCluster; |
| import org.apache.hadoop.yarn.server.nodemanager.NodeManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.mortbay.log.Log; |
| |
| import com.google.common.base.Supplier; |
| |
| /** |
| * Test application master client class to resource manager. |
| */ |
| @RunWith(value = Parameterized.class) |
| public class TestAMRMClient { |
| private String schedulerName = null; |
| private boolean autoUpdate = false; |
| private Configuration conf = null; |
| private MiniYARNCluster yarnCluster = null; |
| private YarnClient yarnClient = null; |
| private List<NodeReport> nodeReports = null; |
| private ApplicationAttemptId attemptId = null; |
| private int nodeCount = 3; |
| |
| static final int rolling_interval_sec = 13; |
| static final long am_expire_ms = 4000; |
| |
| private Resource capability; |
| private Priority priority; |
| private Priority priority2; |
| private String node; |
| private String rack; |
| private String[] nodes; |
| private String[] racks; |
| private final static int DEFAULT_ITERATION = 3; |
| |
| public TestAMRMClient(String schedulerName, boolean autoUpdate) { |
| this.schedulerName = schedulerName; |
| this.autoUpdate = autoUpdate; |
| } |
| |
| @Parameterized.Parameters |
| public static Collection<Object[]> data() { |
| // Currently only capacity scheduler supports auto update. |
| return Arrays.asList(new Object[][] { |
| {CapacityScheduler.class.getName(), true}, |
| {CapacityScheduler.class.getName(), false}, |
| {FairScheduler.class.getName(), false} |
| }); |
| } |
| |
| @Before |
| public void setup() throws Exception { |
| conf = new YarnConfiguration(); |
| createClusterAndStartApplication(); |
| } |
| |
| private void createClusterAndStartApplication() throws Exception { |
| // start minicluster |
| this.conf = conf; |
| if (autoUpdate) { |
| conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); |
| } |
| conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); |
| conf.setLong( |
| YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, |
| rolling_interval_sec); |
| conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); |
| conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); |
| // set the minimum allocation so that resource decrease can go under 1024 |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); |
| conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); |
| conf.setBoolean( |
| YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); |
| conf.setInt( |
| YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); |
| yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); |
| yarnCluster.init(conf); |
| yarnCluster.start(); |
| |
| // start rm client |
| yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(conf); |
| yarnClient.start(); |
| |
| // get node info |
| assertTrue("All node managers did not connect to the RM within the " |
| + "allotted 5-second timeout", |
| yarnCluster.waitForNodeManagersToConnect(5000L)); |
| nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); |
| assertEquals("Not all node managers were reported running", |
| nodeCount, nodeReports.size()); |
| |
| priority = Priority.newInstance(1); |
| priority2 = Priority.newInstance(2); |
| capability = Resource.newInstance(1024, 1); |
| |
| node = nodeReports.get(0).getNodeId().getHost(); |
| rack = nodeReports.get(0).getRackName(); |
| nodes = new String[]{ node }; |
| racks = new String[]{ rack }; |
| |
| // submit new app |
| ApplicationSubmissionContext appContext = |
| yarnClient.createApplication().getApplicationSubmissionContext(); |
| ApplicationId appId = appContext.getApplicationId(); |
| // set the application name |
| appContext.setApplicationName("Test"); |
| // Set the priority for the application master |
| Priority pri = Records.newRecord(Priority.class); |
| pri.setPriority(0); |
| appContext.setPriority(pri); |
| // Set the queue to which this application is to be submitted in the RM |
| appContext.setQueue("default"); |
| // Set up the container launch context for the application master |
| ContainerLaunchContext amContainer = |
| BuilderUtils.newContainerLaunchContext( |
| Collections.<String, LocalResource> emptyMap(), |
| new HashMap<String, String>(), Arrays.asList("sleep", "100"), |
| new HashMap<String, ByteBuffer>(), null, |
| new HashMap<ApplicationAccessType, String>()); |
| appContext.setAMContainerSpec(amContainer); |
| appContext.setResource(Resource.newInstance(1024, 1)); |
| // Create the request to send to the applications manager |
| SubmitApplicationRequest appRequest = Records |
| .newRecord(SubmitApplicationRequest.class); |
| appRequest.setApplicationSubmissionContext(appContext); |
| // Submit the application to the applications manager |
| yarnClient.submitApplication(appContext); |
| |
| // wait for app to start |
| RMAppAttempt appAttempt = null; |
| while (true) { |
| ApplicationReport appReport = yarnClient.getApplicationReport(appId); |
| if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { |
| attemptId = appReport.getCurrentApplicationAttemptId(); |
| appAttempt = |
| yarnCluster.getResourceManager().getRMContext().getRMApps() |
| .get(attemptId.getApplicationId()).getCurrentAppAttempt(); |
| while (true) { |
| if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { |
| break; |
| } |
| } |
| break; |
| } |
| } |
| // Just dig into the ResourceManager and get the AMRMToken just for the sake |
| // of testing. |
| UserGroupInformation.setLoginUser(UserGroupInformation |
| .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); |
| |
| // emulate RM setup of AMRM token in credentials by adding the token |
| // *before* setting the token service |
| UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); |
| appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); |
| } |
| |
| @After |
| public void teardown() throws YarnException, IOException { |
| yarnClient.killApplication(attemptId.getApplicationId()); |
| attemptId = null; |
| |
| if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { |
| yarnClient.stop(); |
| } |
| if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) { |
| yarnCluster.stop(); |
| } |
| } |
| |
| @Test (timeout = 60000) |
| public void testAMRMClientNoMatchingRequests() |
| throws IOException, YarnException { |
| AMRMClient<ContainerRequest> amClient = AMRMClient.createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| Resource testCapability1 = Resource.newInstance(1024, 2); |
| List<? extends Collection<ContainerRequest>> matches = |
| amClient.getMatchingRequests(priority, node, testCapability1); |
| assertEquals("Expected no macthing requests.", matches.size(), 0); |
| } |
| |
| @Test (timeout=60000) |
| public void testAMRMClientMatchingFit() throws YarnException, IOException { |
| AMRMClient<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = AMRMClient.<ContainerRequest>createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| Resource capability1 = Resource.newInstance(1024, 2); |
| Resource capability2 = Resource.newInstance(1024, 1); |
| Resource capability3 = Resource.newInstance(1000, 2); |
| Resource capability4 = Resource.newInstance(2000, 1); |
| Resource capability5 = Resource.newInstance(1000, 3); |
| Resource capability6 = Resource.newInstance(2000, 1); |
| Resource capability7 = Resource.newInstance(2000, 1); |
| |
| ContainerRequest storedContainer1 = |
| new ContainerRequest(capability1, nodes, racks, priority); |
| ContainerRequest storedContainer2 = |
| new ContainerRequest(capability2, nodes, racks, priority); |
| ContainerRequest storedContainer3 = |
| new ContainerRequest(capability3, nodes, racks, priority); |
| ContainerRequest storedContainer4 = |
| new ContainerRequest(capability4, nodes, racks, priority); |
| ContainerRequest storedContainer5 = |
| new ContainerRequest(capability5, nodes, racks, priority); |
| ContainerRequest storedContainer6 = |
| new ContainerRequest(capability6, nodes, racks, priority); |
| ContainerRequest storedContainer7 = |
| new ContainerRequest(capability7, nodes, racks, priority2, false); |
| amClient.addContainerRequest(storedContainer1); |
| amClient.addContainerRequest(storedContainer2); |
| amClient.addContainerRequest(storedContainer3); |
| amClient.addContainerRequest(storedContainer4); |
| amClient.addContainerRequest(storedContainer5); |
| amClient.addContainerRequest(storedContainer6); |
| amClient.addContainerRequest(storedContainer7); |
| |
| // Add some CRs with allocReqIds... These will not be returned by |
| // the default getMatchingRequests |
| ContainerRequest storedContainer11 = |
| new ContainerRequest(capability1, nodes, racks, priority, 1); |
| ContainerRequest storedContainer33 = |
| new ContainerRequest(capability3, nodes, racks, priority, 3); |
| ContainerRequest storedContainer43 = |
| new ContainerRequest(capability4, nodes, racks, priority, 3); |
| amClient.addContainerRequest(storedContainer11); |
| amClient.addContainerRequest(storedContainer33); |
| amClient.addContainerRequest(storedContainer43); |
| |
| // test matching of containers |
| List<? extends Collection<ContainerRequest>> matches; |
| ContainerRequest storedRequest; |
| // exact match |
| Resource testCapability1 = Resource.newInstance(1024, 2); |
| matches = amClient.getMatchingRequests(priority, node, testCapability1); |
| verifyMatches(matches, 1); |
| storedRequest = matches.get(0).iterator().next(); |
| assertEquals(storedContainer1, storedRequest); |
| amClient.removeContainerRequest(storedContainer1); |
| |
| // exact match for allocReqId 1 |
| Collection<ContainerRequest> reqIdMatches = |
| amClient.getMatchingRequests(1); |
| assertEquals(1, reqIdMatches.size()); |
| storedRequest = reqIdMatches.iterator().next(); |
| assertEquals(storedContainer11, storedRequest); |
| amClient.removeContainerRequest(storedContainer11); |
| |
| // exact match for allocReqId 3 |
| reqIdMatches = amClient.getMatchingRequests(3); |
| assertEquals(2, reqIdMatches.size()); |
| Iterator<ContainerRequest> iter = reqIdMatches.iterator(); |
| storedRequest = iter.next(); |
| assertEquals(storedContainer43, storedRequest); |
| amClient.removeContainerRequest(storedContainer43); |
| storedRequest = iter.next(); |
| assertEquals(storedContainer33, storedRequest); |
| amClient.removeContainerRequest(storedContainer33); |
| |
| // exact matching with order maintained |
| Resource testCapability2 = Resource.newInstance(2000, 1); |
| matches = amClient.getMatchingRequests(priority, node, testCapability2); |
| verifyMatches(matches, 2); |
| // must be returned in the order they were made |
| int i = 0; |
| for(ContainerRequest storedRequest1 : matches.get(0)) { |
| if(i++ == 0) { |
| assertEquals(storedContainer4, storedRequest1); |
| } else { |
| assertEquals(storedContainer6, storedRequest1); |
| } |
| } |
| amClient.removeContainerRequest(storedContainer6); |
| |
| // matching with larger container. all requests returned |
| Resource testCapability3 = Resource.newInstance(4000, 4); |
| matches = amClient.getMatchingRequests(priority, node, testCapability3); |
| assert(matches.size() == 4); |
| |
| Resource testCapability4 = Resource.newInstance(1024, 2); |
| matches = amClient.getMatchingRequests(priority, node, testCapability4); |
| assert(matches.size() == 2); |
| // verify non-fitting containers are not returned and fitting ones are |
| for(Collection<ContainerRequest> testSet : matches) { |
| assertEquals(1, testSet.size()); |
| ContainerRequest testRequest = testSet.iterator().next(); |
| assertTrue(testRequest != storedContainer4); |
| assertTrue(testRequest != storedContainer5); |
| assert(testRequest == storedContainer2 || |
| testRequest == storedContainer3); |
| } |
| |
| Resource testCapability5 = Resource.newInstance(512, 4); |
| matches = amClient.getMatchingRequests(priority, node, testCapability5); |
| assert(matches.size() == 0); |
| |
| // verify requests without relaxed locality are only returned at specific |
| // locations |
| Resource testCapability7 = Resource.newInstance(2000, 1); |
| matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY, |
| testCapability7); |
| assert(matches.size() == 0); |
| matches = amClient.getMatchingRequests(priority2, node, testCapability7); |
| assert(matches.size() == 1); |
| |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| /** |
| * Test fit of both GUARANTEED and OPPORTUNISTIC containers. |
| */ |
| @Test (timeout=60000) |
| public void testAMRMClientMatchingFitExecType() |
| throws YarnException, IOException { |
| AMRMClient<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = AMRMClient.<ContainerRequest>createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| Resource capability1 = Resource.newInstance(1024, 2); |
| Resource capability2 = Resource.newInstance(1024, 1); |
| Resource capability3 = Resource.newInstance(1000, 2); |
| Resource capability4 = Resource.newInstance(1000, 2); |
| Resource capability5 = Resource.newInstance(2000, 2); |
| Resource capability6 = Resource.newInstance(2000, 3); |
| Resource capability7 = Resource.newInstance(6000, 3); |
| |
| // Add 2 GUARANTEED and 7 OPPORTUNISTIC requests. |
| ContainerRequest storedGuarContainer1 = |
| new ContainerRequest(capability1, nodes, racks, priority); |
| ContainerRequest storedGuarContainer2 = |
| new ContainerRequest(capability2, nodes, racks, priority); |
| ContainerRequest storedOpportContainer1 = |
| new ContainerRequest(capability1, nodes, racks, priority, |
| 0, true, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| ContainerRequest storedOpportContainer2 = |
| new ContainerRequest(capability2, nodes, racks, priority, |
| 0, true, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| ContainerRequest storedOpportContainer3 = |
| new ContainerRequest(capability3, nodes, racks, priority, |
| 0, true, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| ContainerRequest storedOpportContainer4 = |
| new ContainerRequest(capability4, nodes, racks, priority, |
| 0, true, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| ContainerRequest storedOpportContainer5 = |
| new ContainerRequest(capability5, nodes, racks, priority, |
| 0, true, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| ContainerRequest storedOpportContainer6 = |
| new ContainerRequest(capability6, nodes, racks, priority, |
| 0, true, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| ContainerRequest storedOpportContainer7 = |
| new ContainerRequest(capability7, nodes, racks, priority2, |
| 0, false, null, |
| ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); |
| amClient.addContainerRequest(storedGuarContainer1); |
| amClient.addContainerRequest(storedGuarContainer2); |
| amClient.addContainerRequest(storedOpportContainer1); |
| amClient.addContainerRequest(storedOpportContainer2); |
| amClient.addContainerRequest(storedOpportContainer3); |
| amClient.addContainerRequest(storedOpportContainer4); |
| amClient.addContainerRequest(storedOpportContainer5); |
| amClient.addContainerRequest(storedOpportContainer6); |
| amClient.addContainerRequest(storedOpportContainer7); |
| |
| // Make sure 3 entries are generated in the ask list for each added |
| // container request of a given capability, locality, execution type and |
| // priority (one node-local, one rack-local, and one ANY). |
| assertEquals(24, |
| (((AMRMClientImpl<ContainerRequest>) amClient).ask.size())); |
| |
| // test exact matching of GUARANTEED containers |
| List<? extends Collection<ContainerRequest>> matches; |
| ContainerRequest storedRequest; |
| Resource testCapability1 = Resource.newInstance(1024, 2); |
| matches = amClient |
| .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, |
| testCapability1); |
| verifyMatches(matches, 1); |
| storedRequest = matches.get(0).iterator().next(); |
| assertEquals(storedGuarContainer1, storedRequest); |
| amClient.removeContainerRequest(storedGuarContainer1); |
| |
| // test exact matching of OPPORTUNISTIC containers |
| matches = amClient.getMatchingRequests(priority, node, |
| ExecutionType.OPPORTUNISTIC, testCapability1); |
| verifyMatches(matches, 1); |
| storedRequest = matches.get(0).iterator().next(); |
| assertEquals(storedOpportContainer1, storedRequest); |
| amClient.removeContainerRequest(storedOpportContainer1); |
| |
| // exact OPPORTUNISTIC matching with order maintained |
| Resource testCapability2 = Resource.newInstance(1000, 2); |
| matches = amClient.getMatchingRequests(priority, node, |
| ExecutionType.OPPORTUNISTIC, testCapability2); |
| verifyMatches(matches, 2); |
| // must be returned in the order they were made |
| int i = 0; |
| for(ContainerRequest storedRequest1 : matches.get(0)) { |
| if(i++ == 0) { |
| assertEquals(storedOpportContainer3, storedRequest1); |
| } else { |
| assertEquals(storedOpportContainer4, storedRequest1); |
| } |
| } |
| amClient.removeContainerRequest(storedOpportContainer3); |
| |
| // matching with larger container |
| Resource testCapability3 = Resource.newInstance(4000, 4); |
| matches = amClient.getMatchingRequests(priority, node, |
| ExecutionType.OPPORTUNISTIC, testCapability3); |
| assert(matches.size() == 4); |
| |
| // verify requests without relaxed locality are only returned at specific |
| // locations |
| Resource testCapability4 = Resource.newInstance(6000, 3); |
| matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY, |
| ExecutionType.OPPORTUNISTIC, testCapability4); |
| assert(matches.size() == 0); |
| matches = amClient.getMatchingRequests(priority2, node, |
| ExecutionType.OPPORTUNISTIC, testCapability4); |
| assert(matches.size() == 1); |
| |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| private void verifyMatches( |
| List<? extends Collection<ContainerRequest>> matches, |
| int matchSize) { |
| assertEquals(1, matches.size()); |
| assertEquals(matchSize, matches.get(0).size()); |
| } |
| |
| @Test (timeout=60000) |
| public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = new AMRMClientImpl<ContainerRequest>(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| Resource capability = Resource.newInstance(1024, 2); |
| |
| ContainerRequest storedContainer1 = |
| new ContainerRequest(capability, nodes, null, priority); |
| amClient.addContainerRequest(storedContainer1); |
| |
| // verify matching with original node and inferred rack |
| List<? extends Collection<ContainerRequest>> matches; |
| ContainerRequest storedRequest; |
| // exact match node |
| matches = amClient.getMatchingRequests(priority, node, capability); |
| verifyMatches(matches, 1); |
| storedRequest = matches.get(0).iterator().next(); |
| assertEquals(storedContainer1, storedRequest); |
| // inferred match rack |
| matches = amClient.getMatchingRequests(priority, rack, capability); |
| verifyMatches(matches, 1); |
| storedRequest = matches.get(0).iterator().next(); |
| assertEquals(storedContainer1, storedRequest); |
| |
| // inferred rack match no longer valid after request is removed |
| amClient.removeContainerRequest(storedContainer1); |
| matches = amClient.getMatchingRequests(priority, rack, capability); |
| assertTrue(matches.isEmpty()); |
| |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| @Test //(timeout=60000) |
| public void testAMRMClientMatchStorage() throws YarnException, IOException { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = |
| (AMRMClientImpl<ContainerRequest>) AMRMClient |
| .<ContainerRequest> createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| Priority priority1 = Records.newRecord(Priority.class); |
| priority1.setPriority(2); |
| |
| ContainerRequest storedContainer1 = |
| new ContainerRequest(capability, nodes, racks, priority); |
| ContainerRequest storedContainer2 = |
| new ContainerRequest(capability, nodes, racks, priority); |
| ContainerRequest storedContainer3 = |
| new ContainerRequest(capability, null, null, priority1); |
| amClient.addContainerRequest(storedContainer1); |
| amClient.addContainerRequest(storedContainer2); |
| amClient.addContainerRequest(storedContainer3); |
| |
| // test addition and storage |
| RemoteRequestsTable<ContainerRequest> remoteRequestsTable = |
| amClient.getTable(0); |
| int containersRequestedAny = remoteRequestsTable.get(priority, |
| ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) |
| .remoteRequest.getNumContainers(); |
| assertEquals(2, containersRequestedAny); |
| containersRequestedAny = remoteRequestsTable.get(priority1, |
| ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) |
| .remoteRequest.getNumContainers(); |
| assertEquals(1, containersRequestedAny); |
| List<? extends Collection<ContainerRequest>> matches = |
| amClient.getMatchingRequests(priority, node, capability); |
| verifyMatches(matches, 2); |
| matches = amClient.getMatchingRequests(priority, rack, capability); |
| verifyMatches(matches, 2); |
| matches = |
| amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); |
| verifyMatches(matches, 2); |
| matches = amClient.getMatchingRequests(priority1, rack, capability); |
| assertTrue(matches.isEmpty()); |
| matches = |
| amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); |
| verifyMatches(matches, 1); |
| |
| // test removal |
| amClient.removeContainerRequest(storedContainer3); |
| matches = amClient.getMatchingRequests(priority, node, capability); |
| verifyMatches(matches, 2); |
| amClient.removeContainerRequest(storedContainer2); |
| matches = amClient.getMatchingRequests(priority, node, capability); |
| verifyMatches(matches, 1); |
| matches = amClient.getMatchingRequests(priority, rack, capability); |
| verifyMatches(matches, 1); |
| |
| // test matching of containers |
| ContainerRequest storedRequest = matches.get(0).iterator().next(); |
| assertEquals(storedContainer1, storedRequest); |
| amClient.removeContainerRequest(storedContainer1); |
| matches = |
| amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); |
| assertTrue(matches.isEmpty()); |
| matches = |
| amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); |
| assertTrue(matches.isEmpty()); |
| // 0 requests left. everything got cleaned up |
| assertTrue(amClient.getTable(0).isEmpty()); |
| |
| // go through an exemplary allocation, matching and release cycle |
| amClient.addContainerRequest(storedContainer1); |
| amClient.addContainerRequest(storedContainer3); |
| // RM should allocate container within 2 calls to allocate() |
| int allocatedContainerCount = 0; |
| int iterationsLeft = 3; |
| while (allocatedContainerCount < 2 |
| && iterationsLeft-- > 0) { |
| Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft); |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| assertEquals(nodeCount, amClient.getClusterNodeCount()); |
| allocatedContainerCount += allocResponse.getAllocatedContainers().size(); |
| for(Container container : allocResponse.getAllocatedContainers()) { |
| ContainerRequest expectedRequest = |
| container.getPriority().equals(storedContainer1.getPriority()) ? |
| storedContainer1 : storedContainer3; |
| matches = amClient.getMatchingRequests(container.getPriority(), |
| ResourceRequest.ANY, |
| container.getResource()); |
| // test correct matched container is returned |
| verifyMatches(matches, 1); |
| ContainerRequest matchedRequest = matches.get(0).iterator().next(); |
| assertEquals(matchedRequest, expectedRequest); |
| amClient.removeContainerRequest(matchedRequest); |
| // assign this container, use it and release it |
| amClient.releaseAssignedContainer(container.getId()); |
| } |
| if(allocatedContainerCount < containersRequestedAny) { |
| // let NM heartbeat to RM and trigger allocations |
| triggerSchedulingWithNMHeartBeat(); |
| } |
| } |
| |
| assertEquals(2, allocatedContainerCount); |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| assertEquals(0, amClient.release.size()); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, allocResponse.getAllocatedContainers().size()); |
| // 0 requests left. everything got cleaned up |
| assertTrue(remoteRequestsTable.isEmpty()); |
| |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| /** |
| * Make sure we get allocations regardless of timing issues. |
| */ |
| private void triggerSchedulingWithNMHeartBeat() { |
| // Simulate fair scheduler update thread |
| RMContext context = yarnCluster.getResourceManager().getRMContext(); |
| if (context.getScheduler() instanceof FairScheduler) { |
| FairScheduler scheduler = (FairScheduler)context.getScheduler(); |
| scheduler.update(); |
| } |
| // Trigger NM's heartbeat to RM and trigger allocations |
| for (RMNode rmNode : context.getRMNodes().values()) { |
| context.getScheduler().handle(new NodeUpdateSchedulerEvent(rmNode)); |
| } |
| if (context.getScheduler() instanceof FairScheduler) { |
| FairScheduler scheduler = (FairScheduler)context.getScheduler(); |
| scheduler.update(); |
| } |
| } |
| |
| @Test (timeout=60000) |
| public void testAllocationWithBlacklist() throws YarnException, IOException { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = |
| (AMRMClientImpl<ContainerRequest>) AMRMClient |
| .<ContainerRequest> createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| ContainerRequest storedContainer1 = |
| new ContainerRequest(capability, nodes, racks, priority); |
| amClient.addContainerRequest(storedContainer1); |
| assertEquals(3, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| List<String> localNodeBlacklist = new ArrayList<String>(); |
| localNodeBlacklist.add(node); |
| |
| // put node in black list, so no container assignment |
| amClient.updateBlacklist(localNodeBlacklist, null); |
| |
| int allocatedContainerCount = getAllocatedContainersNumber(amClient, |
| DEFAULT_ITERATION); |
| // the only node is in blacklist, so no allocation |
| assertEquals(0, allocatedContainerCount); |
| |
| // Remove node from blacklist, so get assigned with 2 |
| amClient.updateBlacklist(null, localNodeBlacklist); |
| ContainerRequest storedContainer2 = |
| new ContainerRequest(capability, nodes, racks, priority); |
| amClient.addContainerRequest(storedContainer2); |
| allocatedContainerCount = getAllocatedContainersNumber(amClient, |
| DEFAULT_ITERATION); |
| assertEquals(2, allocatedContainerCount); |
| |
| // Test in case exception in allocate(), blacklist is kept |
| assertTrue(amClient.blacklistAdditions.isEmpty()); |
| assertTrue(amClient.blacklistRemovals.isEmpty()); |
| |
| // create a invalid ContainerRequest - memory value is minus |
| ContainerRequest invalidContainerRequest = |
| new ContainerRequest(Resource.newInstance(-1024, 1), |
| nodes, racks, priority); |
| amClient.addContainerRequest(invalidContainerRequest); |
| amClient.updateBlacklist(localNodeBlacklist, null); |
| try { |
| // allocate() should complain as ContainerRequest is invalid. |
| amClient.allocate(0.1f); |
| fail("there should be an exception here."); |
| } catch (Exception e) { |
| assertEquals(1, amClient.blacklistAdditions.size()); |
| } |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| @Test (timeout=60000) |
| public void testAMRMClientWithBlacklist() throws YarnException, IOException { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = |
| (AMRMClientImpl<ContainerRequest>) AMRMClient |
| .<ContainerRequest> createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| String[] nodes = {"node1", "node2", "node3"}; |
| |
| // Add nodes[0] and nodes[1] |
| List<String> nodeList01 = new ArrayList<String>(); |
| nodeList01.add(nodes[0]); |
| nodeList01.add(nodes[1]); |
| amClient.updateBlacklist(nodeList01, null); |
| assertEquals(2, amClient.blacklistAdditions.size()); |
| assertEquals(0, amClient.blacklistRemovals.size()); |
| |
| // Add nodes[0] again, verify it is not added duplicated. |
| List<String> nodeList02 = new ArrayList<String>(); |
| nodeList02.add(nodes[0]); |
| nodeList02.add(nodes[2]); |
| amClient.updateBlacklist(nodeList02, null); |
| assertEquals(3, amClient.blacklistAdditions.size()); |
| assertEquals(0, amClient.blacklistRemovals.size()); |
| |
| // Add nodes[1] and nodes[2] to removal list, |
| // Verify addition list remove these two nodes. |
| List<String> nodeList12 = new ArrayList<String>(); |
| nodeList12.add(nodes[1]); |
| nodeList12.add(nodes[2]); |
| amClient.updateBlacklist(null, nodeList12); |
| assertEquals(1, amClient.blacklistAdditions.size()); |
| assertEquals(2, amClient.blacklistRemovals.size()); |
| |
| // Add nodes[1] again to addition list, |
| // Verify removal list will remove this node. |
| List<String> nodeList1 = new ArrayList<String>(); |
| nodeList1.add(nodes[1]); |
| amClient.updateBlacklist(nodeList1, null); |
| assertEquals(2, amClient.blacklistAdditions.size()); |
| assertEquals(1, amClient.blacklistRemovals.size()); |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| private int getAllocatedContainersNumber( |
| AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft) |
| throws YarnException, IOException { |
| int allocatedContainerCount = 0; |
| while (iterationsLeft-- > 0) { |
| Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft); |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| assertEquals(nodeCount, amClient.getClusterNodeCount()); |
| allocatedContainerCount += allocResponse.getAllocatedContainers().size(); |
| |
| if(allocatedContainerCount == 0) { |
| // let NM heartbeat to RM and trigger allocations |
| triggerSchedulingWithNMHeartBeat(); |
| } |
| } |
| return allocatedContainerCount; |
| } |
| |
| @Test (timeout=60000) |
| public void testAMRMClient() throws YarnException, IOException { |
| initAMRMClientAndTest(false); |
| } |
| |
| @Test (timeout=60000) |
| public void testAMRMClientAllocReqId() throws YarnException, IOException { |
| initAMRMClientAndTest(true); |
| } |
| |
| @Test (timeout=60000) |
| public void testAMRMClientWithSaslEncryption() throws Exception { |
| // we have to create a new instance of MiniYARNCluster to avoid SASL qop |
| // mismatches between client and server |
| teardown(); |
| conf = new YarnConfiguration(); |
| conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, "privacy"); |
| createClusterAndStartApplication(); |
| initAMRMClientAndTest(false); |
| } |
| |
| private void initAMRMClientAndTest(boolean useAllocReqId) |
| throws YarnException, IOException { |
| AMRMClient<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = AMRMClient.<ContainerRequest>createAMRMClient(); |
| |
| //setting an instance NMTokenCache |
| amClient.setNMTokenCache(new NMTokenCache()); |
| //asserting we are not using the singleton instance cache |
| Assert.assertNotSame(NMTokenCache.getSingleton(), |
| amClient.getNMTokenCache()); |
| |
| amClient.init(conf); |
| amClient.start(); |
| |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| if (useAllocReqId) { |
| testAllocRequestId((AMRMClientImpl<ContainerRequest>)amClient); |
| } else { |
| testAllocation((AMRMClientImpl<ContainerRequest>) amClient); |
| } |
| |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| @Test(timeout=30000) |
| public void testAskWithNodeLabels() { |
| AMRMClientImpl<ContainerRequest> client = |
| new AMRMClientImpl<ContainerRequest>(); |
| |
| // add exp=x to ANY |
| client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, |
| 1), null, null, Priority.UNDEFINED, true, "x")); |
| assertEquals(1, client.ask.size()); |
| assertEquals("x", client.ask.iterator().next() |
| .getNodeLabelExpression()); |
| |
| // add exp=x then add exp=a to ANY in same priority, only exp=a should kept |
| client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, |
| 1), null, null, Priority.UNDEFINED, true, "x")); |
| client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, |
| 1), null, null, Priority.UNDEFINED, true, "a")); |
| assertEquals(1, client.ask.size()); |
| assertEquals("a", client.ask.iterator().next() |
| .getNodeLabelExpression()); |
| |
| // add exp=x to ANY, rack and node, only resource request has ANY resource |
| // name will be assigned the label expression |
| // add exp=x then add exp=a to ANY in same priority, only exp=a should kept |
| client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024, |
| 1), null, null, Priority.UNDEFINED, true, |
| "y")); |
| assertEquals(1, client.ask.size()); |
| for (ResourceRequest req : client.ask) { |
| if (ResourceRequest.ANY.equals(req.getResourceName())) { |
| assertEquals("y", req.getNodeLabelExpression()); |
| } else { |
| Assert.assertNull(req.getNodeLabelExpression()); |
| } |
| } |
| // set container with nodes and racks with labels |
| client.addContainerRequest(new ContainerRequest( |
| Resource.newInstance(1024, 1), new String[] { "rack1" }, |
| new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y")); |
| for (ResourceRequest req : client.ask) { |
| if (ResourceRequest.ANY.equals(req.getResourceName())) { |
| assertEquals("y", req.getNodeLabelExpression()); |
| } else { |
| Assert.assertNull(req.getNodeLabelExpression()); |
| } |
| } |
| } |
| |
| private void verifyAddRequestFailed(AMRMClient<ContainerRequest> client, |
| ContainerRequest request) { |
| try { |
| client.addContainerRequest(request); |
| } catch (InvalidContainerRequestException e) { |
| return; |
| } |
| fail(); |
| } |
| |
| @Test(timeout=30000) |
| public void testAskWithInvalidNodeLabels() { |
| AMRMClientImpl<ContainerRequest> client = |
| new AMRMClientImpl<ContainerRequest>(); |
| |
| // specified exp with more than one node labels |
| verifyAddRequestFailed(client, |
| new ContainerRequest(Resource.newInstance(1024, 1), null, null, |
| Priority.UNDEFINED, true, "x && y")); |
| } |
| |
| @Test(timeout=60000) |
| public void testAMRMClientWithContainerResourceChange() |
| throws YarnException, IOException { |
| // Fair scheduler does not support resource change |
| Assume.assumeTrue(schedulerName.equals(CapacityScheduler.class.getName())); |
| AMRMClient<ContainerRequest> amClient = null; |
| try { |
| // start am rm client |
| amClient = AMRMClient.createAMRMClient(); |
| Assert.assertNotNull(amClient); |
| // asserting we are using the singleton instance cache |
| Assert.assertSame( |
| NMTokenCache.getSingleton(), amClient.getNMTokenCache()); |
| amClient.init(conf); |
| amClient.start(); |
| assertEquals(STATE.STARTED, amClient.getServiceState()); |
| // start am nm client |
| NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); |
| Assert.assertNotNull(nmClient); |
| // asserting we are using the singleton instance cache |
| Assert.assertSame( |
| NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); |
| nmClient.init(conf); |
| nmClient.start(); |
| assertEquals(STATE.STARTED, nmClient.getServiceState()); |
| // am rm client register the application master with RM |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| // allocate three containers and make sure they are in RUNNING state |
| List<Container> containers = |
| allocateAndStartContainers(amClient, nmClient, 3); |
| // perform container resource increase and decrease tests |
| doContainerResourceChange(amClient, containers); |
| // unregister and finish up the test |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| private List<Container> allocateAndStartContainers( |
| final AMRMClient<ContainerRequest> amClient, final NMClient nmClient, |
| int num) throws YarnException, IOException { |
| // set up allocation requests |
| for (int i = 0; i < num; ++i) { |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| } |
| // send allocation requests |
| amClient.allocate(0.1f); |
| // let NM heartbeat to RM and trigger allocations |
| triggerSchedulingWithNMHeartBeat(); |
| // get allocations |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| List<Container> containers = allocResponse.getAllocatedContainers(); |
| assertEquals(num, containers.size()); |
| |
| // build container launch context |
| Credentials ts = new Credentials(); |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| ts.writeTokenStorageToStream(dob); |
| ByteBuffer securityTokens = |
| ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| // start a process long enough for increase/decrease action to take effect |
| ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext( |
| Collections.<String, LocalResource>emptyMap(), |
| new HashMap<String, String>(), Arrays.asList("sleep", "100"), |
| new HashMap<String, ByteBuffer>(), securityTokens, |
| new HashMap<ApplicationAccessType, String>()); |
| // start the containers and make sure they are in RUNNING state |
| try { |
| for (int i = 0; i < num; i++) { |
| Container container = containers.get(i); |
| nmClient.startContainer(container, clc); |
| // NodeManager may still need some time to get the stable |
| // container status |
| while (true) { |
| ContainerStatus status = nmClient.getContainerStatus( |
| container.getId(), container.getNodeId()); |
| if (status.getState() == ContainerState.RUNNING) { |
| break; |
| } |
| sleep(10); |
| } |
| } |
| } catch (YarnException e) { |
| throw new AssertionError("Exception is not expected: " + e); |
| } |
| // let NM's heartbeat to RM to confirm container launch |
| triggerSchedulingWithNMHeartBeat(); |
| return containers; |
| } |
| |
| |
| private void doContainerResourceChange( |
| final AMRMClient<ContainerRequest> amClient, List<Container> containers) |
| throws YarnException, IOException { |
| assertEquals(3, containers.size()); |
| // remember the container IDs |
| Container container1 = containers.get(0); |
| Container container2 = containers.get(1); |
| Container container3 = containers.get(2); |
| AMRMClientImpl<ContainerRequest> amClientImpl = |
| (AMRMClientImpl<ContainerRequest>) amClient; |
| assertEquals(0, amClientImpl.change.size()); |
| // verify newer request overwrites older request for the container1 |
| amClientImpl.requestContainerUpdate(container1, |
| UpdateContainerRequest.newInstance(container1.getVersion(), |
| container1.getId(), ContainerUpdateType.INCREASE_RESOURCE, |
| Resource.newInstance(2048, 1), null)); |
| amClientImpl.requestContainerUpdate(container1, |
| UpdateContainerRequest.newInstance(container1.getVersion(), |
| container1.getId(), ContainerUpdateType.INCREASE_RESOURCE, |
| Resource.newInstance(4096, 1), null)); |
| assertEquals(Resource.newInstance(4096, 1), |
| amClientImpl.change.get(container1.getId()).getValue().getCapability()); |
| // verify new decrease request cancels old increase request for container1 |
| amClientImpl.requestContainerUpdate(container1, |
| UpdateContainerRequest.newInstance(container1.getVersion(), |
| container1.getId(), ContainerUpdateType.DECREASE_RESOURCE, |
| Resource.newInstance(512, 1), null)); |
| assertEquals(Resource.newInstance(512, 1), |
| amClientImpl.change.get(container1.getId()).getValue().getCapability()); |
| // request resource increase for container2 |
| amClientImpl.requestContainerUpdate(container2, |
| UpdateContainerRequest.newInstance(container2.getVersion(), |
| container2.getId(), ContainerUpdateType.INCREASE_RESOURCE, |
| Resource.newInstance(2048, 1), null)); |
| assertEquals(Resource.newInstance(2048, 1), |
| amClientImpl.change.get(container2.getId()).getValue().getCapability()); |
| // verify release request will cancel pending change requests for the same |
| // container |
| amClientImpl.requestContainerUpdate(container3, |
| UpdateContainerRequest.newInstance(container3.getVersion(), |
| container3.getId(), ContainerUpdateType.INCREASE_RESOURCE, |
| Resource.newInstance(2048, 1), null)); |
| assertEquals(3, amClientImpl.pendingChange.size()); |
| amClientImpl.releaseAssignedContainer(container3.getId()); |
| assertEquals(2, amClientImpl.pendingChange.size()); |
| // as of now: container1 asks to decrease to (512, 1) |
| // container2 asks to increase to (2048, 1) |
| // send allocation requests |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| assertEquals(0, amClientImpl.change.size()); |
| // we should get decrease confirmation right away |
| List<UpdatedContainer> updatedContainers = |
| allocResponse.getUpdatedContainers(); |
| assertEquals(1, updatedContainers.size()); |
| // we should get increase allocation after the next NM's heartbeat to RM |
| triggerSchedulingWithNMHeartBeat(); |
| // get allocations |
| allocResponse = amClient.allocate(0.1f); |
| updatedContainers = |
| allocResponse.getUpdatedContainers(); |
| assertEquals(1, updatedContainers.size()); |
| } |
| |
| @Test |
| public void testAMRMContainerPromotionAndDemotionWithAutoUpdate() |
| throws Exception { |
| AMRMClientImpl<AMRMClient.ContainerRequest> amClient = |
| (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient |
| .createAMRMClient(); |
| amClient.init(conf); |
| amClient.start(); |
| |
| // start am nm client |
| NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); |
| Assert.assertNotNull(nmClient); |
| nmClient.init(conf); |
| nmClient.start(); |
| assertEquals(STATE.STARTED, nmClient.getServiceState()); |
| |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| // setup container request |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| // START OPPORTUNISTIC Container, Send allocation request to RM |
| Resource reqResource = Resource.newInstance(512, 1); |
| amClient.addContainerRequest( |
| new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0, |
| true, null, ExecutionTypeRequest |
| .newInstance(ExecutionType.OPPORTUNISTIC, true))); |
| |
| // RM should allocate container within 1 calls to allocate() |
| AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0); |
| |
| assertEquals(1, allocResponse.getAllocatedContainers().size()); |
| startContainer(allocResponse, nmClient); |
| |
| Container c = allocResponse.getAllocatedContainers().get(0); |
| amClient.requestContainerUpdate(c, |
| UpdateContainerRequest.newInstance(c.getVersion(), |
| c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED)); |
| |
| allocResponse = waitForAllocation(amClient, 0, 1); |
| |
| // Make sure container is updated. |
| UpdatedContainer updatedContainer = allocResponse |
| .getUpdatedContainers().get(0); |
| |
| // If container auto update is not enabled, we need to notify |
| // NM about this update. |
| if (!autoUpdate) { |
| nmClient.updateContainerResource(updatedContainer.getContainer()); |
| } |
| |
| // Wait until NM context updated, or fail on timeout. |
| waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED); |
| |
| // Once promoted, demote it back to OPPORTUNISTIC |
| amClient.requestContainerUpdate(updatedContainer.getContainer(), |
| UpdateContainerRequest.newInstance( |
| updatedContainer.getContainer().getVersion(), |
| updatedContainer.getContainer().getId(), |
| ContainerUpdateType.DEMOTE_EXECUTION_TYPE, |
| null, ExecutionType.OPPORTUNISTIC)); |
| |
| allocResponse = waitForAllocation(amClient, 0, 1); |
| |
| // Make sure container is updated. |
| updatedContainer = allocResponse.getUpdatedContainers().get(0); |
| |
| if (!autoUpdate) { |
| nmClient.updateContainerResource(updatedContainer.getContainer()); |
| } |
| |
| // Wait until NM context updated, or fail on timeout. |
| waitForNMContextUpdate(updatedContainer, ExecutionType.OPPORTUNISTIC); |
| |
| amClient.close(); |
| } |
| |
| private AllocateResponse waitForAllocation(AMRMClient amrmClient, |
| int expectedAllocatedContainerNum, int expectedUpdatedContainerNum) |
| throws Exception { |
| AllocateResponse allocResponse = null; |
| int iteration = 100; |
| while(iteration>0) { |
| allocResponse = amrmClient.allocate(0.1f); |
| int actualAllocated = allocResponse.getAllocatedContainers().size(); |
| int actualUpdated = allocResponse.getUpdatedContainers().size(); |
| if (expectedAllocatedContainerNum == actualAllocated && |
| expectedUpdatedContainerNum == actualUpdated) { |
| break; |
| } |
| Thread.sleep(100); |
| iteration--; |
| } |
| return allocResponse; |
| } |
| |
| private void waitForNMContextUpdate(final UpdatedContainer updatedContainer, |
| final ExecutionType expectedType) { |
| for (int i=0; i<nodeCount; i++) { |
| final NodeManager nm = yarnCluster.getNodeManager(i); |
| if (nm.getNMContext().getNodeId() |
| .equals(updatedContainer.getContainer().getNodeId())) { |
| try { |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override public Boolean get() { |
| org.apache.hadoop.yarn.server.nodemanager.containermanager |
| .container.Container nmContainer = |
| nm.getNMContext().getContainers() |
| .get(updatedContainer.getContainer().getId()); |
| if (nmContainer != null) { |
| ExecutionType actual = nmContainer.getContainerTokenIdentifier() |
| .getExecutionType(); |
| return actual.equals(expectedType); |
| } |
| return false; |
| } |
| }, 1000, 30000); |
| } catch (TimeoutException e) { |
| fail("Times out waiting for container state in" |
| + " NM context to be updated"); |
| } catch (InterruptedException e) { |
| // Ignorable. |
| } |
| break; |
| } |
| |
| // Iterated all nodes but still can't get a match |
| if (i == nodeCount -1) { |
| fail("Container doesn't exist in NM context."); |
| } |
| } |
| } |
| |
| @Test(timeout=60000) |
| public void testAMRMClientWithContainerPromotion() |
| throws YarnException, IOException { |
| AMRMClientImpl<AMRMClient.ContainerRequest> amClient = |
| (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient |
| .createAMRMClient(); |
| //asserting we are not using the singleton instance cache |
| Assert.assertSame(NMTokenCache.getSingleton(), |
| amClient.getNMTokenCache()); |
| amClient.init(conf); |
| amClient.start(); |
| |
| // start am nm client |
| NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); |
| Assert.assertNotNull(nmClient); |
| // asserting we are using the singleton instance cache |
| Assert.assertSame( |
| NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); |
| nmClient.init(conf); |
| nmClient.start(); |
| assertEquals(STATE.STARTED, nmClient.getServiceState()); |
| |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| // setup container request |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| // START OPPORTUNISTIC Container, Send allocation request to RM |
| amClient.addContainerRequest( |
| new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, |
| true, null, ExecutionTypeRequest |
| .newInstance(ExecutionType.OPPORTUNISTIC, true))); |
| |
| int oppContainersRequestedAny = |
| amClient.getTable(0).get(priority2, ResourceRequest.ANY, |
| ExecutionType.OPPORTUNISTIC, capability).remoteRequest |
| .getNumContainers(); |
| |
| assertEquals(1, oppContainersRequestedAny); |
| assertEquals(1, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| // RM should allocate container within 2 calls to allocate() |
| int allocatedContainerCount = 0; |
| Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>(); |
| int iterationsLeft = 50; |
| |
| amClient.getNMTokenCache().clearCache(); |
| assertEquals(0, |
| amClient.getNMTokenCache().numberOfTokensInCache()); |
| |
| AllocateResponse allocResponse = null; |
| while (allocatedContainerCount < oppContainersRequestedAny |
| && iterationsLeft-- > 0) { |
| allocResponse = amClient.allocate(0.1f); |
| // let NM heartbeat to RM and trigger allocations |
| //triggerSchedulingWithNMHeartBeat(); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| allocatedContainerCount += |
| allocResponse.getAllocatedContainers().size(); |
| for (Container container : allocResponse.getAllocatedContainers()) { |
| if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { |
| allocatedOpportContainers.put(container.getId(), container); |
| } |
| } |
| if (allocatedContainerCount < oppContainersRequestedAny) { |
| // sleep to let NM's heartbeat to RM and trigger allocations |
| sleep(100); |
| } |
| } |
| |
| assertEquals(oppContainersRequestedAny, allocatedContainerCount); |
| assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size()); |
| |
| startContainer(allocResponse, nmClient); |
| |
| // SEND PROMOTION REQUEST TO RM |
| try { |
| Container c = allocatedOpportContainers.values().iterator().next(); |
| amClient.requestContainerUpdate( |
| c, UpdateContainerRequest.newInstance(c.getVersion(), |
| c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.OPPORTUNISTIC)); |
| fail("Should throw Exception.."); |
| } catch (IllegalArgumentException e) { |
| System.out.println("## " + e.getMessage()); |
| assertTrue(e.getMessage().contains( |
| "target should be GUARANTEED and original should be OPPORTUNISTIC")); |
| } |
| |
| Container c = allocatedOpportContainers.values().iterator().next(); |
| amClient.requestContainerUpdate( |
| c, UpdateContainerRequest.newInstance(c.getVersion(), |
| c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED)); |
| iterationsLeft = 120; |
| Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); |
| // do a few iterations to ensure RM is not going to send new containers |
| while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { |
| // inform RM of rejection |
| allocResponse = amClient.allocate(0.1f); |
| // RM did not send new containers because AM does not need any |
| if (allocResponse.getUpdatedContainers() != null) { |
| for (UpdatedContainer updatedContainer : allocResponse |
| .getUpdatedContainers()) { |
| System.out.println("Got update.."); |
| updatedContainers.put(updatedContainer.getContainer().getId(), |
| updatedContainer); |
| } |
| } |
| if (iterationsLeft > 0) { |
| // sleep to make sure NM's heartbeat |
| sleep(100); |
| } |
| } |
| assertEquals(1, updatedContainers.size()); |
| |
| for (ContainerId cId : allocatedOpportContainers.keySet()) { |
| Container orig = allocatedOpportContainers.get(cId); |
| UpdatedContainer updatedContainer = updatedContainers.get(cId); |
| assertNotNull(updatedContainer); |
| assertEquals(ExecutionType.GUARANTEED, |
| updatedContainer.getContainer().getExecutionType()); |
| assertEquals(orig.getResource(), |
| updatedContainer.getContainer().getResource()); |
| assertEquals(orig.getNodeId(), |
| updatedContainer.getContainer().getNodeId()); |
| assertEquals(orig.getVersion() + 1, |
| updatedContainer.getContainer().getVersion()); |
| } |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| // SEND UPDATE EXECTYPE UPDATE TO NM |
| updateContainerExecType(allocResponse, ExecutionType.GUARANTEED, nmClient); |
| |
| amClient.ask.clear(); |
| } |
| |
| @Test(timeout=60000) |
| public void testAMRMClientWithContainerDemotion() |
| throws YarnException, IOException { |
| AMRMClientImpl<AMRMClient.ContainerRequest> amClient = |
| (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient |
| .createAMRMClient(); |
| //asserting we are not using the singleton instance cache |
| Assert.assertSame(NMTokenCache.getSingleton(), |
| amClient.getNMTokenCache()); |
| amClient.init(conf); |
| amClient.start(); |
| |
| NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); |
| Assert.assertNotNull(nmClient); |
| // asserting we are using the singleton instance cache |
| Assert.assertSame( |
| NMTokenCache.getSingleton(), nmClient.getNMTokenCache()); |
| nmClient.init(conf); |
| nmClient.start(); |
| assertEquals(STATE.STARTED, nmClient.getServiceState()); |
| |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| // START OPPORTUNISTIC Container, Send allocation request to RM |
| amClient.addContainerRequest( |
| new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, |
| true, null, ExecutionTypeRequest |
| .newInstance(ExecutionType.GUARANTEED, true))); |
| |
| int oppContainersRequestedAny = |
| amClient.getTable(0).get(priority2, ResourceRequest.ANY, |
| ExecutionType.GUARANTEED, capability).remoteRequest |
| .getNumContainers(); |
| |
| assertEquals(1, oppContainersRequestedAny); |
| assertEquals(1, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| // RM should allocate container within 2 calls to allocate() |
| int allocatedContainerCount = 0; |
| Map<ContainerId, Container> allocatedGuaranteedContainers = new HashMap<>(); |
| int iterationsLeft = 50; |
| |
| amClient.getNMTokenCache().clearCache(); |
| assertEquals(0, |
| amClient.getNMTokenCache().numberOfTokensInCache()); |
| |
| AllocateResponse allocResponse = null; |
| while (allocatedContainerCount < oppContainersRequestedAny |
| && iterationsLeft-- > 0) { |
| allocResponse = amClient.allocate(0.1f); |
| // let NM heartbeat to RM and trigger allocations |
| //triggerSchedulingWithNMHeartBeat(); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| allocatedContainerCount += |
| allocResponse.getAllocatedContainers().size(); |
| for (Container container : allocResponse.getAllocatedContainers()) { |
| if (container.getExecutionType() == ExecutionType.GUARANTEED) { |
| allocatedGuaranteedContainers.put(container.getId(), container); |
| } |
| } |
| if (allocatedContainerCount < oppContainersRequestedAny) { |
| // sleep to let NM's heartbeat to RM and trigger allocations |
| sleep(100); |
| } |
| } |
| assertEquals(oppContainersRequestedAny, allocatedContainerCount); |
| assertEquals(oppContainersRequestedAny, |
| allocatedGuaranteedContainers.size()); |
| startContainer(allocResponse, nmClient); |
| |
| // SEND DEMOTION REQUEST TO RM |
| try { |
| Container c = allocatedGuaranteedContainers.values().iterator().next(); |
| amClient.requestContainerUpdate( |
| c, UpdateContainerRequest.newInstance(c.getVersion(), |
| c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, |
| null, ExecutionType.GUARANTEED)); |
| fail("Should throw Exception.."); |
| } catch (IllegalArgumentException e) { |
| System.out.println("## " + e.getMessage()); |
| assertTrue(e.getMessage().contains( |
| "target should be OPPORTUNISTIC and original should be GUARANTEED")); |
| } |
| |
| Container c = allocatedGuaranteedContainers.values().iterator().next(); |
| amClient.requestContainerUpdate( |
| c, UpdateContainerRequest.newInstance(c.getVersion(), |
| c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, |
| null, ExecutionType.OPPORTUNISTIC)); |
| iterationsLeft = 120; |
| Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); |
| // do a few iterations to ensure RM is not going to send new containers |
| while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { |
| // inform RM of rejection |
| allocResponse = amClient.allocate(0.1f); |
| // RM did not send new containers because AM does not need any |
| if (allocResponse.getUpdatedContainers() != null) { |
| for (UpdatedContainer updatedContainer : allocResponse |
| .getUpdatedContainers()) { |
| System.out.println("Got update.."); |
| updatedContainers.put(updatedContainer.getContainer().getId(), |
| updatedContainer); |
| } |
| } |
| if (iterationsLeft > 0) { |
| // sleep to make sure NM's heartbeat |
| sleep(100); |
| } |
| } |
| assertEquals(1, updatedContainers.size()); |
| |
| for (ContainerId cId : allocatedGuaranteedContainers.keySet()) { |
| Container orig = allocatedGuaranteedContainers.get(cId); |
| UpdatedContainer updatedContainer = updatedContainers.get(cId); |
| assertNotNull(updatedContainer); |
| assertEquals(ExecutionType.OPPORTUNISTIC, |
| updatedContainer.getContainer().getExecutionType()); |
| assertEquals(orig.getResource(), |
| updatedContainer.getContainer().getResource()); |
| assertEquals(orig.getNodeId(), |
| updatedContainer.getContainer().getNodeId()); |
| assertEquals(orig.getVersion() + 1, |
| updatedContainer.getContainer().getVersion()); |
| } |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| updateContainerExecType(allocResponse, ExecutionType.OPPORTUNISTIC, |
| nmClient); |
| amClient.ask.clear(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private void updateContainerExecType(AllocateResponse allocResponse, |
| ExecutionType expectedExecType, NMClientImpl nmClient) |
| throws IOException, YarnException { |
| for (UpdatedContainer updatedContainer : allocResponse |
| .getUpdatedContainers()) { |
| Container container = updatedContainer.getContainer(); |
| if (!autoUpdate) { |
| nmClient.increaseContainerResource(container); |
| } |
| // NodeManager may still need some time to get the stable |
| // container status |
| while (true) { |
| ContainerStatus status = nmClient |
| .getContainerStatus(container.getId(), container.getNodeId()); |
| if (status.getExecutionType() == expectedExecType) { |
| break; |
| } |
| sleep(10); |
| } |
| } |
| } |
| |
| private void startContainer(AllocateResponse allocResponse, |
| NMClientImpl nmClient) throws IOException, YarnException { |
| // START THE CONTAINER IN NM |
| // build container launch context |
| Credentials ts = new Credentials(); |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| ts.writeTokenStorageToStream(dob); |
| ByteBuffer securityTokens = |
| ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); |
| // start a process long enough for increase/decrease action to take effect |
| ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext( |
| Collections.<String, LocalResource>emptyMap(), |
| new HashMap<String, String>(), Arrays.asList("sleep", "100"), |
| new HashMap<String, ByteBuffer>(), securityTokens, |
| new HashMap<ApplicationAccessType, String>()); |
| // start the containers and make sure they are in RUNNING state |
| for (Container container : allocResponse.getAllocatedContainers()) { |
| nmClient.startContainer(container, clc); |
| // NodeManager may still need some time to get the stable |
| // container status |
| while (true) { |
| ContainerStatus status = nmClient |
| .getContainerStatus(container.getId(), container.getNodeId()); |
| if (status.getState() == ContainerState.RUNNING) { |
| break; |
| } |
| sleep(10); |
| } |
| } |
| } |
| |
| |
| private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient) |
| throws YarnException, IOException { |
| // setup container request |
| |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| |
| assertNumContainers(amClient, 0, 2, 2, 2, 3, 0); |
| int containersRequestedAny = 2; |
| |
| // RM should allocate container within 2 calls to allocate() |
| int allocatedContainerCount = 0; |
| int iterationsLeft = 3; |
| Set<ContainerId> releases = new TreeSet<ContainerId>(); |
| |
| amClient.getNMTokenCache().clearCache(); |
| assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache()); |
| HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>(); |
| |
| while (allocatedContainerCount < containersRequestedAny |
| && iterationsLeft-- > 0) { |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| assertEquals(nodeCount, amClient.getClusterNodeCount()); |
| allocatedContainerCount += allocResponse.getAllocatedContainers().size(); |
| for(Container container : allocResponse.getAllocatedContainers()) { |
| ContainerId rejectContainerId = container.getId(); |
| releases.add(rejectContainerId); |
| amClient.releaseAssignedContainer(rejectContainerId); |
| } |
| |
| for (NMToken token : allocResponse.getNMTokens()) { |
| String nodeID = token.getNodeId().toString(); |
| if (receivedNMTokens.containsKey(nodeID)) { |
| fail("Received token again for : " + nodeID); |
| } |
| receivedNMTokens.put(nodeID, token.getToken()); |
| } |
| |
| if(allocatedContainerCount < containersRequestedAny) { |
| // let NM heartbeat to RM and trigger allocations |
| triggerSchedulingWithNMHeartBeat(); |
| } |
| } |
| |
| // Should receive atleast 1 token |
| assertTrue(receivedNMTokens.size() > 0 |
| && receivedNMTokens.size() <= nodeCount); |
| |
| assertEquals(allocatedContainerCount, containersRequestedAny); |
| assertEquals(2, releases.size()); |
| assertEquals(0, amClient.ask.size()); |
| |
| // need to tell the AMRMClient that we dont need these resources anymore |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| assertEquals(3, amClient.ask.size()); |
| // send 0 container count request for resources that are no longer needed |
| ResourceRequest snoopRequest = amClient.ask.iterator().next(); |
| assertEquals(0, snoopRequest.getNumContainers()); |
| |
| // test RPC exception handling |
| amClient.addContainerRequest(new ContainerRequest(capability, nodes, |
| racks, priority)); |
| amClient.addContainerRequest(new ContainerRequest(capability, nodes, |
| racks, priority)); |
| snoopRequest = amClient.ask.iterator().next(); |
| assertEquals(2, snoopRequest.getNumContainers()); |
| |
| ApplicationMasterProtocol realRM = amClient.rmClient; |
| try { |
| ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); |
| when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer( |
| new Answer<AllocateResponse>() { |
| public AllocateResponse answer(InvocationOnMock invocation) |
| throws Exception { |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, |
| racks, priority)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| throw new Exception(); |
| } |
| }); |
| amClient.rmClient = mockRM; |
| amClient.allocate(0.1f); |
| }catch (Exception ioe) {} |
| finally { |
| amClient.rmClient = realRM; |
| } |
| |
| assertEquals(2, amClient.release.size()); |
| assertEquals(3, amClient.ask.size()); |
| snoopRequest = amClient.ask.iterator().next(); |
| // verify that the remove request made in between makeRequest and allocate |
| // has not been lost |
| assertEquals(0, snoopRequest.getNumContainers()); |
| |
| waitForContainerCompletion(3, amClient, releases); |
| } |
| |
| private void waitForContainerCompletion(int numIterations, |
| AMRMClientImpl<ContainerRequest> amClient, Set<ContainerId> releases) |
| throws YarnException, IOException { |
| // do a few iterations to ensure RM is not going send new containers |
| while(!releases.isEmpty() || numIterations-- > 0) { |
| // inform RM of rejection |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| // RM did not send new containers because AM does not need any |
| assertEquals(0, allocResponse.getAllocatedContainers().size()); |
| if(allocResponse.getCompletedContainersStatuses().size() > 0) { |
| for(ContainerStatus cStatus :allocResponse |
| .getCompletedContainersStatuses()) { |
| if(releases.contains(cStatus.getContainerId())) { |
| assertEquals(cStatus.getState(), ContainerState.COMPLETE); |
| assertEquals(-100, cStatus.getExitStatus()); |
| releases.remove(cStatus.getContainerId()); |
| } |
| } |
| } |
| if(numIterations > 0) { |
| // let NM heartbeat to RM and trigger allocations |
| triggerSchedulingWithNMHeartBeat(); |
| } |
| } |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| } |
| |
| private void testAllocRequestId( |
| final AMRMClientImpl<ContainerRequest> amClient) throws YarnException, |
| IOException { |
| // setup container request |
| |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority, 1)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority, 1)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority, 2)); |
| amClient.addContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority, 2)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority, 1)); |
| amClient.removeContainerRequest( |
| new ContainerRequest(capability, nodes, racks, priority, 2)); |
| |
| assertNumContainers(amClient, 0, 1, 1, 1, 9, 0); |
| assertNumContainers(amClient, 1, 1, 1, 1, 9, 0); |
| assertNumContainers(amClient, 2, 1, 1, 1, 9, 0); |
| int containersRequestedAny = 3; |
| |
| // RM should allocate container within 2 calls to allocate() |
| List<Container> allocatedContainers = new ArrayList<>(); |
| int iterationsLeft = 5; |
| Set<ContainerId> releases = new TreeSet<ContainerId>(); |
| |
| while (allocatedContainers.size() < containersRequestedAny |
| && iterationsLeft-- > 0) { |
| AllocateResponse allocResponse = amClient.allocate(0.1f); |
| assertEquals(0, amClient.ask.size()); |
| assertEquals(0, amClient.release.size()); |
| |
| allocatedContainers.addAll(allocResponse.getAllocatedContainers()); |
| for(Container container : allocResponse.getAllocatedContainers()) { |
| ContainerId rejectContainerId = container.getId(); |
| releases.add(rejectContainerId); |
| amClient.releaseAssignedContainer(rejectContainerId); |
| } |
| |
| if(allocatedContainers.size() < containersRequestedAny) { |
| // let NM heartbeat to RM and trigger allocations |
| triggerSchedulingWithNMHeartBeat(); |
| } |
| } |
| |
| assertEquals(containersRequestedAny, allocatedContainers.size()); |
| Set<Long> expAllocIds = new HashSet<>( |
| Arrays.asList(Long.valueOf(0), Long.valueOf(1), Long.valueOf(2))); |
| Set<Long> actAllocIds = new HashSet<>(); |
| for (Container ac : allocatedContainers) { |
| actAllocIds.add(Long.valueOf(ac.getAllocationRequestId())); |
| } |
| assertEquals(expAllocIds, actAllocIds); |
| assertEquals(3, amClient.release.size()); |
| assertEquals(0, amClient.ask.size()); |
| |
| waitForContainerCompletion(3, amClient, releases); |
| } |
| |
| private void assertNumContainers(AMRMClientImpl<ContainerRequest> amClient, |
| long allocationReqId, int expNode, int expRack, int expAny, |
| int expAsks, int expRelease) { |
| RemoteRequestsTable<ContainerRequest> remoteRequestsTable = |
| amClient.getTable(allocationReqId); |
| int containersRequestedNode = remoteRequestsTable.get(priority, |
| node, ExecutionType.GUARANTEED, capability).remoteRequest |
| .getNumContainers(); |
| int containersRequestedRack = remoteRequestsTable.get(priority, |
| rack, ExecutionType.GUARANTEED, capability).remoteRequest |
| .getNumContainers(); |
| int containersRequestedAny = remoteRequestsTable.get(priority, |
| ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) |
| .remoteRequest.getNumContainers(); |
| |
| assertEquals(expNode, containersRequestedNode); |
| assertEquals(expRack, containersRequestedRack); |
| assertEquals(expAny, containersRequestedAny); |
| assertEquals(expAsks, amClient.ask.size()); |
| assertEquals(expRelease, amClient.release.size()); |
| } |
| |
| class CountDownSupplier implements Supplier<Boolean> { |
| int counter = 0; |
| @Override |
| public Boolean get() { |
| counter++; |
| if (counter >= 3) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| }; |
| |
| @Test |
| public void testWaitFor() throws InterruptedException { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| CountDownSupplier countDownChecker = new CountDownSupplier(); |
| |
| try { |
| // start am rm client |
| amClient = |
| (AMRMClientImpl<ContainerRequest>) AMRMClient |
| .<ContainerRequest> createAMRMClient(); |
| amClient.init(new YarnConfiguration()); |
| amClient.start(); |
| amClient.waitFor(countDownChecker, 1000); |
| assertEquals(3, countDownChecker.counter); |
| } finally { |
| if (amClient != null) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| private void sleep(int sleepTime) { |
| try { |
| Thread.sleep(sleepTime); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, |
| IOException { |
| AMRMClient<ContainerRequest> amClient = null; |
| try { |
| AMRMTokenSecretManager amrmTokenSecretManager = |
| yarnCluster.getResourceManager().getRMContext() |
| .getAMRMTokenSecretManager(); |
| |
| // start am rm client |
| amClient = AMRMClient.<ContainerRequest> createAMRMClient(); |
| |
| amClient.init(conf); |
| amClient.start(); |
| |
| Long startTime = System.currentTimeMillis(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 = |
| getAMRMToken(); |
| Assert.assertNotNull(amrmToken_1); |
| assertEquals(amrmToken_1.decodeIdentifier().getKeyId(), |
| amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId()); |
| |
| // Wait for enough time and make sure the roll_over happens |
| // At mean time, the old AMRMToken should continue to work |
| while (System.currentTimeMillis() - startTime < |
| rolling_interval_sec * 1000) { |
| amClient.allocate(0.1f); |
| sleep(1000); |
| } |
| amClient.allocate(0.1f); |
| |
| org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 = |
| getAMRMToken(); |
| Assert.assertNotNull(amrmToken_2); |
| assertEquals(amrmToken_2.decodeIdentifier().getKeyId(), |
| amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId()); |
| |
| Assert.assertNotEquals(amrmToken_1, amrmToken_2); |
| |
| // can do the allocate call with latest AMRMToken |
| AllocateResponse response = amClient.allocate(0.1f); |
| |
| // Verify latest AMRMToken can be used to send allocation request. |
| UserGroupInformation testUser1 = |
| UserGroupInformation.createRemoteUser("testUser1"); |
| |
| AMRMTokenIdentifierForTest newVersionTokenIdentifier = |
| new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message"); |
| |
| assertEquals("Message is changed after set to newVersionTokenIdentifier", |
| "message", newVersionTokenIdentifier.getMessage()); |
| org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken = |
| new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> ( |
| newVersionTokenIdentifier.getBytes(), |
| amrmTokenSecretManager.retrievePassword(newVersionTokenIdentifier), |
| newVersionTokenIdentifier.getKind(), new Text()); |
| |
| SecurityUtil.setTokenService(newVersionToken, yarnCluster |
| .getResourceManager().getApplicationMasterService().getBindAddress()); |
| testUser1.addToken(newVersionToken); |
| |
| AllocateRequest request = Records.newRecord(AllocateRequest.class); |
| request.setResponseId(response.getResponseId()); |
| testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { |
| @Override |
| public ApplicationMasterProtocol run() { |
| return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy( |
| ApplicationMasterProtocol.class, |
| yarnCluster.getResourceManager().getApplicationMasterService() |
| .getBindAddress(), conf); |
| } |
| }).allocate(request); |
| |
| // Make sure previous token has been rolled-over |
| // and can not use this rolled-over token to make a allocate all. |
| while (true) { |
| if (amrmToken_2.decodeIdentifier().getKeyId() != amrmTokenSecretManager |
| .getCurrnetMasterKeyData().getMasterKey().getKeyId()) { |
| if (amrmTokenSecretManager.getNextMasterKeyData() == null) { |
| break; |
| } else if (amrmToken_2.decodeIdentifier().getKeyId() != |
| amrmTokenSecretManager.getNextMasterKeyData().getMasterKey() |
| .getKeyId()) { |
| break; |
| } |
| } |
| amClient.allocate(0.1f); |
| sleep(1000); |
| } |
| |
| try { |
| UserGroupInformation testUser2 = |
| UserGroupInformation.createRemoteUser("testUser2"); |
| SecurityUtil.setTokenService(amrmToken_2, yarnCluster |
| .getResourceManager().getApplicationMasterService().getBindAddress()); |
| testUser2.addToken(amrmToken_2); |
| testUser2.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { |
| @Override |
| public ApplicationMasterProtocol run() { |
| return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy( |
| ApplicationMasterProtocol.class, |
| yarnCluster.getResourceManager().getApplicationMasterService() |
| .getBindAddress(), conf); |
| } |
| }).allocate(Records.newRecord(AllocateRequest.class)); |
| fail("The old Token should not work"); |
| } catch (Exception ex) { |
| assertTrue(ex instanceof InvalidToken); |
| assertTrue(ex.getMessage().contains( |
| "Invalid AMRMToken from " |
| + amrmToken_2.decodeIdentifier().getApplicationAttemptId())); |
| } |
| |
| amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, |
| null, null); |
| |
| } finally { |
| if (amClient != null && amClient.getServiceState() == STATE.STARTED) { |
| amClient.stop(); |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> |
| getAMRMToken() throws IOException { |
| Credentials credentials = |
| UserGroupInformation.getCurrentUser().getCredentials(); |
| Iterator<org.apache.hadoop.security.token.Token<?>> iter = |
| credentials.getAllTokens().iterator(); |
| org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> result = null; |
| while (iter.hasNext()) { |
| org.apache.hadoop.security.token.Token<?> token = iter.next(); |
| if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { |
| if (result != null) { |
| fail("credentials has more than one AMRM token." |
| + " token1: " + result + " token2: " + token); |
| } |
| result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>) |
| token; |
| } |
| } |
| return result; |
| } |
| |
| @Test(timeout = 60000) |
| public void testNoUpdateTrackingUrl() { |
| try { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| amClient = new AMRMClientImpl<>(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| assertEquals("", amClient.appTrackingUrl); |
| |
| ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); |
| AllocateResponse mockResponse = mock(AllocateResponse.class); |
| when(mockRM.allocate(any(AllocateRequest.class))) |
| .thenReturn(mockResponse); |
| ApplicationMasterProtocol realRM = amClient.rmClient; |
| amClient.rmClient = mockRM; |
| // Do allocate without updated tracking url |
| amClient.allocate(0.1f); |
| ArgumentCaptor<AllocateRequest> argument = |
| ArgumentCaptor.forClass(AllocateRequest.class); |
| verify(mockRM).allocate(argument.capture()); |
| assertNull(argument.getValue().getTrackingUrl()); |
| |
| amClient.rmClient = realRM; |
| amClient |
| .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, |
| null); |
| } catch (IOException | YarnException e) { |
| throw new AssertionError( |
| "testNoUpdateTrackingUrl unexpectedly threw exception: " + e); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testUpdateTrackingUrl() { |
| try { |
| AMRMClientImpl<ContainerRequest> amClient = null; |
| amClient = new AMRMClientImpl<>(); |
| amClient.init(conf); |
| amClient.start(); |
| amClient.registerApplicationMaster("Host", 10000, ""); |
| |
| String trackingUrl = "hadoop.apache.org"; |
| assertEquals("", amClient.appTrackingUrl); |
| |
| ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class); |
| AllocateResponse mockResponse = mock(AllocateResponse.class); |
| when(mockRM.allocate(any(AllocateRequest.class))) |
| .thenReturn(mockResponse); |
| ApplicationMasterProtocol realRM = amClient.rmClient; |
| amClient.rmClient = mockRM; |
| // Do allocate with updated tracking url |
| amClient.updateTrackingUrl(trackingUrl); |
| assertEquals(trackingUrl, amClient.newTrackingUrl); |
| assertEquals("", amClient.appTrackingUrl); |
| amClient.allocate(0.1f); |
| assertNull(amClient.newTrackingUrl); |
| assertEquals(trackingUrl, amClient.appTrackingUrl); |
| ArgumentCaptor<AllocateRequest> argument |
| = ArgumentCaptor.forClass(AllocateRequest.class); |
| verify(mockRM).allocate(argument.capture()); |
| assertEquals(trackingUrl, argument.getValue().getTrackingUrl()); |
| |
| amClient.rmClient = realRM; |
| amClient |
| .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, |
| null); |
| } catch (IOException | YarnException e) { |
| throw new AssertionError( |
| "testUpdateTrackingUrl unexpectedly threw exception: " + e); |
| } |
| } |
| } |