| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.client.AMRMClientUtils; |
| import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; |
| import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Unit test for AMRMClientRelayer. |
| */ |
| public class TestAMRMClientRelayer { |
| |
| /** |
| * Mocked ApplicationMasterService in RM. |
| */ |
| public static class MockApplicationMasterService |
| implements ApplicationMasterProtocol { |
| |
| // Whether this mockRM will throw failover exception upon next heartbeat |
| // from AM |
| private boolean failover = false; |
| |
| // Whether this mockRM will throw application already registered exception |
| // upon next registerApplicationMaster call |
| private boolean throwAlreadyRegister = false; |
| |
| private int responseIdReset = -1; |
| private List<ResourceRequest> lastAsk; |
| private List<ContainerId> lastRelease; |
| private List<String> lastBlacklistAdditions; |
| private List<String> lastBlacklistRemovals; |
| |
| @Override |
| public RegisterApplicationMasterResponse registerApplicationMaster( |
| RegisterApplicationMasterRequest request) |
| throws YarnException, IOException { |
| if (this.throwAlreadyRegister) { |
| this.throwAlreadyRegister = false; |
| throw new InvalidApplicationMasterRequestException( |
| AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + "appId"); |
| } |
| return null; |
| } |
| |
| @Override |
| public FinishApplicationMasterResponse finishApplicationMaster( |
| FinishApplicationMasterRequest request) |
| throws YarnException, IOException { |
| if (this.failover) { |
| this.failover = false; |
| throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); |
| } |
| return null; |
| } |
| |
| @Override |
| public AllocateResponse allocate(AllocateRequest request) |
| throws YarnException, IOException { |
| if (this.failover) { |
| this.failover = false; |
| throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); |
| } |
| if (this.responseIdReset != -1) { |
| String errorMessage = |
| AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(null, |
| this.responseIdReset, request.getResponseId()); |
| this.responseIdReset = -1; |
| throw new InvalidApplicationMasterRequestException(errorMessage); |
| } |
| |
| this.lastAsk = request.getAskList(); |
| this.lastRelease = request.getReleaseList(); |
| this.lastBlacklistAdditions = |
| request.getResourceBlacklistRequest().getBlacklistAdditions(); |
| this.lastBlacklistRemovals = |
| request.getResourceBlacklistRequest().getBlacklistRemovals(); |
| return AllocateResponse.newInstance(request.getResponseId() + 1, null, |
| null, new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null, |
| 0, null, null); |
| } |
| |
| public void setFailoverFlag() { |
| this.failover = true; |
| } |
| |
| public void setThrowAlreadyRegister() { |
| this.throwAlreadyRegister = true; |
| } |
| |
| public void setResponseIdReset(int expectedResponseId) { |
| this.responseIdReset = expectedResponseId; |
| } |
| } |
| |
| private Configuration conf; |
| private MockApplicationMasterService mockAMS; |
| private AMRMClientRelayer relayer; |
| |
| private int responseId = 0; |
| |
| // Buffer of asks that will be sent to RM in the next AM heartbeat |
| private List<ResourceRequest> asks = new ArrayList<>(); |
| private List<ContainerId> releases = new ArrayList<>(); |
| private List<String> blacklistAdditions = new ArrayList<>(); |
| private List<String> blacklistRemoval = new ArrayList<>(); |
| |
| @Before |
| public void setup() throws YarnException, IOException { |
| this.conf = new Configuration(); |
| |
| this.mockAMS = new MockApplicationMasterService(); |
| this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST"); |
| this.relayer.registerApplicationMaster( |
| RegisterApplicationMasterRequest.newInstance("", 0, "")); |
| |
| clearAllocateRequestLists(); |
| } |
| |
| @After |
| public void cleanup() { |
| this.relayer.shutdown(); |
| } |
| |
| private void assertAsksAndReleases(int expectedAsk, int expectedRelease) { |
| Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size()); |
| Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size()); |
| } |
| |
| private void assertBlacklistAdditionsAndRemovals(int expectedAdditions, |
| int expectedRemovals) { |
| Assert.assertEquals(expectedAdditions, |
| this.mockAMS.lastBlacklistAdditions.size()); |
| Assert.assertEquals(expectedRemovals, |
| this.mockAMS.lastBlacklistRemovals.size()); |
| } |
| |
| private AllocateRequest getAllocateRequest() { |
| // Need to create a new one every time because rather than directly |
| // referring the lists, the protobuf impl makes a copy of the lists |
| return AllocateRequest.newInstance(responseId, 0, asks, releases, |
| ResourceBlacklistRequest.newInstance(blacklistAdditions, |
| blacklistRemoval)); |
| } |
| |
| private void clearAllocateRequestLists() { |
| this.asks.clear(); |
| this.releases.clear(); |
| this.blacklistAdditions.clear(); |
| this.blacklistRemoval.clear(); |
| } |
| |
| private static ContainerId createContainerId(int id) { |
| return ContainerId.newContainerId( |
| ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), |
| id); |
| } |
| |
| protected ResourceRequest createResourceRequest(long id, String resource, |
| int memory, int vCores, int priority, ExecutionType execType, |
| int containers) { |
| ResourceRequest req = Records.newRecord(ResourceRequest.class); |
| req.setAllocationRequestId(id); |
| req.setResourceName(resource); |
| req.setCapability(Resource.newInstance(memory, vCores)); |
| req.setPriority(Priority.newInstance(priority)); |
| req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType)); |
| req.setNumContainers(containers); |
| return req; |
| } |
| |
| /** |
| * Test the proper handling of removal/cancel of resource requests. |
| */ |
| @Test |
| public void testResourceRequestCleanup() throws YarnException, IOException { |
| // Ask for two containers, one with location preference |
| this.asks.add(createResourceRequest(0, "node", 2048, 1, 1, |
| ExecutionType.GUARANTEED, 1)); |
| this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1, |
| ExecutionType.GUARANTEED, 1)); |
| this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, |
| ExecutionType.GUARANTEED, 2)); |
| this.relayer.allocate(getAllocateRequest()); |
| |
| assertAsksAndReleases(3, 0); |
| Assert.assertEquals(1, this.relayer.getRemotePendingAsks().size()); |
| ResourceRequestSet set = |
| this.relayer.getRemotePendingAsks().values().iterator().next(); |
| Assert.assertEquals(3, set.getAsks().size()); |
| clearAllocateRequestLists(); |
| |
| // Cancel one ask |
| this.asks.add(createResourceRequest(0, "node", 2048, 1, 1, |
| ExecutionType.GUARANTEED, 0)); |
| this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, |
| ExecutionType.GUARANTEED, 1)); |
| this.relayer.allocate(getAllocateRequest()); |
| |
| assertAsksAndReleases(2, 0); |
| Assert.assertEquals(1, relayer.getRemotePendingAsks().size()); |
| set = this.relayer.getRemotePendingAsks().values().iterator().next(); |
| Assert.assertEquals(2, set.getAsks().size()); |
| clearAllocateRequestLists(); |
| |
| // Cancel the other ask, the pending askSet should be removed |
| this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, |
| ExecutionType.GUARANTEED, 0)); |
| this.relayer.allocate(AllocateRequest.newInstance(0, 0, asks, null, null)); |
| |
| assertAsksAndReleases(1, 0); |
| Assert.assertEquals(0, this.relayer.getRemotePendingAsks().size()); |
| } |
| |
| /** |
| * Test the full pending resend after RM fails over. |
| */ |
| @Test |
| public void testResendRequestsOnRMRestart() |
| throws YarnException, IOException { |
| ContainerId c1 = createContainerId(1); |
| ContainerId c2 = createContainerId(2); |
| ContainerId c3 = createContainerId(3); |
| |
| // Ask for two containers, one with location preference |
| this.asks.add(createResourceRequest(0, "node1", 2048, 1, 1, |
| ExecutionType.GUARANTEED, 1)); |
| this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1, |
| ExecutionType.GUARANTEED, 1)); |
| this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, |
| ExecutionType.GUARANTEED, 2)); |
| |
| this.releases.add(c1); |
| this.blacklistAdditions.add("node1"); |
| this.blacklistRemoval.add("node0"); |
| |
| // 1. a fully loaded request |
| this.relayer.allocate(getAllocateRequest()); |
| assertAsksAndReleases(3, 1); |
| assertBlacklistAdditionsAndRemovals(1, 1); |
| clearAllocateRequestLists(); |
| |
| // 2. empty request |
| this.relayer.allocate(getAllocateRequest()); |
| assertAsksAndReleases(0, 0); |
| assertBlacklistAdditionsAndRemovals(0, 0); |
| clearAllocateRequestLists(); |
| |
| // Set RM restart and failover flag |
| this.mockAMS.setFailoverFlag(); |
| |
| // More requests |
| this.blacklistAdditions.add("node2"); |
| this.releases.add(c2); |
| this.relayer.allocate(getAllocateRequest()); |
| |
| // verify pending requests are fully re-sent |
| assertAsksAndReleases(3, 2); |
| assertBlacklistAdditionsAndRemovals(2, 0); |
| clearAllocateRequestLists(); |
| } |
| |
| @Test |
| public void testResponseIdResync() throws YarnException, IOException { |
| this.responseId = 10; |
| |
| AllocateResponse response = this.relayer.allocate(getAllocateRequest()); |
| Assert.assertEquals(this.responseId + 1, response.getResponseId()); |
| |
| int expected = 5; |
| this.mockAMS.setResponseIdReset(expected); |
| |
| try { |
| this.relayer.allocate(getAllocateRequest()); |
| Assert.fail("Expecting exception from RM"); |
| } catch (InvalidApplicationMasterRequestException e) { |
| // Expected exception |
| } |
| |
| // Verify that the responseId is overridden |
| response = this.relayer.allocate(getAllocateRequest()); |
| Assert.assertEquals(expected + 1, response.getResponseId()); |
| |
| // Verify it is no longer overriden |
| this.responseId = response.getResponseId(); |
| response = this.relayer.allocate(getAllocateRequest()); |
| Assert.assertEquals(this.responseId + 1, response.getResponseId()); |
| } |
| |
| @Test |
| public void testConcurrentReregister() throws YarnException, IOException { |
| |
| // Set RM restart and failover flag |
| this.mockAMS.setFailoverFlag(); |
| |
| this.mockAMS.setThrowAlreadyRegister(); |
| |
| relayer.finishApplicationMaster(null); |
| } |
| } |