blob: 46570a1465d8d82373c2b04ff693acfac5185a25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Unit test for AMRMClientRelayer.
*/
public class TestAMRMClientRelayer {
/**
* Mocked ApplicationMasterService in RM.
*/
public static class MockApplicationMasterService
implements ApplicationMasterProtocol {
// Whether this mockRM will throw failover exception upon next heartbeat
// from AM
private boolean failover = false;
// Whether this mockRM will throw application already registered exception
// upon next registerApplicationMaster call
private boolean throwAlreadyRegister = false;
private int responseIdReset = -1;
private List<ResourceRequest> lastAsk;
private List<ContainerId> lastRelease;
private List<String> lastBlacklistAdditions;
private List<String> lastBlacklistRemovals;
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
if (this.throwAlreadyRegister) {
this.throwAlreadyRegister = false;
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + "appId");
}
return null;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
if (this.failover) {
this.failover = false;
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
}
return null;
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
if (this.failover) {
this.failover = false;
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
}
if (this.responseIdReset != -1) {
String errorMessage =
AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(null,
this.responseIdReset, request.getResponseId());
this.responseIdReset = -1;
throw new InvalidApplicationMasterRequestException(errorMessage);
}
this.lastAsk = request.getAskList();
this.lastRelease = request.getReleaseList();
this.lastBlacklistAdditions =
request.getResourceBlacklistRequest().getBlacklistAdditions();
this.lastBlacklistRemovals =
request.getResourceBlacklistRequest().getBlacklistRemovals();
return AllocateResponse.newInstance(request.getResponseId() + 1, null,
null, new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null,
0, null, null);
}
public void setFailoverFlag() {
this.failover = true;
}
public void setThrowAlreadyRegister() {
this.throwAlreadyRegister = true;
}
public void setResponseIdReset(int expectedResponseId) {
this.responseIdReset = expectedResponseId;
}
}
private Configuration conf;
private MockApplicationMasterService mockAMS;
private AMRMClientRelayer relayer;
private int responseId = 0;
// Buffer of asks that will be sent to RM in the next AM heartbeat
private List<ResourceRequest> asks = new ArrayList<>();
private List<ContainerId> releases = new ArrayList<>();
private List<String> blacklistAdditions = new ArrayList<>();
private List<String> blacklistRemoval = new ArrayList<>();
@Before
public void setup() throws YarnException, IOException {
this.conf = new Configuration();
this.mockAMS = new MockApplicationMasterService();
this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
this.relayer.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance("", 0, ""));
clearAllocateRequestLists();
}
@After
public void cleanup() {
this.relayer.shutdown();
}
private void assertAsksAndReleases(int expectedAsk, int expectedRelease) {
Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size());
Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size());
}
private void assertBlacklistAdditionsAndRemovals(int expectedAdditions,
int expectedRemovals) {
Assert.assertEquals(expectedAdditions,
this.mockAMS.lastBlacklistAdditions.size());
Assert.assertEquals(expectedRemovals,
this.mockAMS.lastBlacklistRemovals.size());
}
private AllocateRequest getAllocateRequest() {
// Need to create a new one every time because rather than directly
// referring the lists, the protobuf impl makes a copy of the lists
return AllocateRequest.newInstance(responseId, 0, asks, releases,
ResourceBlacklistRequest.newInstance(blacklistAdditions,
blacklistRemoval));
}
private void clearAllocateRequestLists() {
this.asks.clear();
this.releases.clear();
this.blacklistAdditions.clear();
this.blacklistRemoval.clear();
}
private static ContainerId createContainerId(int id) {
return ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
id);
}
protected ResourceRequest createResourceRequest(long id, String resource,
int memory, int vCores, int priority, ExecutionType execType,
int containers) {
ResourceRequest req = Records.newRecord(ResourceRequest.class);
req.setAllocationRequestId(id);
req.setResourceName(resource);
req.setCapability(Resource.newInstance(memory, vCores));
req.setPriority(Priority.newInstance(priority));
req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
req.setNumContainers(containers);
return req;
}
/**
* Test the proper handling of removal/cancel of resource requests.
*/
@Test
public void testResourceRequestCleanup() throws YarnException, IOException {
// Ask for two containers, one with location preference
this.asks.add(createResourceRequest(0, "node", 2048, 1, 1,
ExecutionType.GUARANTEED, 1));
this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1,
ExecutionType.GUARANTEED, 1));
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 2));
this.relayer.allocate(getAllocateRequest());
assertAsksAndReleases(3, 0);
Assert.assertEquals(1, this.relayer.getRemotePendingAsks().size());
ResourceRequestSet set =
this.relayer.getRemotePendingAsks().values().iterator().next();
Assert.assertEquals(3, set.getAsks().size());
clearAllocateRequestLists();
// Cancel one ask
this.asks.add(createResourceRequest(0, "node", 2048, 1, 1,
ExecutionType.GUARANTEED, 0));
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 1));
this.relayer.allocate(getAllocateRequest());
assertAsksAndReleases(2, 0);
Assert.assertEquals(1, relayer.getRemotePendingAsks().size());
set = this.relayer.getRemotePendingAsks().values().iterator().next();
Assert.assertEquals(2, set.getAsks().size());
clearAllocateRequestLists();
// Cancel the other ask, the pending askSet should be removed
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 0));
this.relayer.allocate(AllocateRequest.newInstance(0, 0, asks, null, null));
assertAsksAndReleases(1, 0);
Assert.assertEquals(0, this.relayer.getRemotePendingAsks().size());
}
/**
* Test the full pending resend after RM fails over.
*/
@Test
public void testResendRequestsOnRMRestart()
throws YarnException, IOException {
ContainerId c1 = createContainerId(1);
ContainerId c2 = createContainerId(2);
ContainerId c3 = createContainerId(3);
// Ask for two containers, one with location preference
this.asks.add(createResourceRequest(0, "node1", 2048, 1, 1,
ExecutionType.GUARANTEED, 1));
this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1,
ExecutionType.GUARANTEED, 1));
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 2));
this.releases.add(c1);
this.blacklistAdditions.add("node1");
this.blacklistRemoval.add("node0");
// 1. a fully loaded request
this.relayer.allocate(getAllocateRequest());
assertAsksAndReleases(3, 1);
assertBlacklistAdditionsAndRemovals(1, 1);
clearAllocateRequestLists();
// 2. empty request
this.relayer.allocate(getAllocateRequest());
assertAsksAndReleases(0, 0);
assertBlacklistAdditionsAndRemovals(0, 0);
clearAllocateRequestLists();
// Set RM restart and failover flag
this.mockAMS.setFailoverFlag();
// More requests
this.blacklistAdditions.add("node2");
this.releases.add(c2);
this.relayer.allocate(getAllocateRequest());
// verify pending requests are fully re-sent
assertAsksAndReleases(3, 2);
assertBlacklistAdditionsAndRemovals(2, 0);
clearAllocateRequestLists();
}
@Test
public void testResponseIdResync() throws YarnException, IOException {
this.responseId = 10;
AllocateResponse response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(this.responseId + 1, response.getResponseId());
int expected = 5;
this.mockAMS.setResponseIdReset(expected);
try {
this.relayer.allocate(getAllocateRequest());
Assert.fail("Expecting exception from RM");
} catch (InvalidApplicationMasterRequestException e) {
// Expected exception
}
// Verify that the responseId is overridden
response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(expected + 1, response.getResponseId());
// Verify it is no longer overriden
this.responseId = response.getResponseId();
response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(this.responseId + 1, response.getResponseId());
}
@Test
public void testConcurrentReregister() throws YarnException, IOException {
// Set RM restart and failover flag
this.mockAMS.setFailoverFlag();
this.mockAMS.setThrowAlreadyRegister();
relayer.finishApplicationMaster(null);
}
}