blob: b269fa45677ee3e198fe2725e04c55d0fdde7121 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
public class TestAMRMProxyService extends BaseAMRMProxyTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestAMRMProxyService.class);
private static MockResourceManagerFacade mockRM;
/**
* Test if the pipeline is created properly.
*/
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root =
super.getAMRMProxyService().createRequestInterceptorChain();
int index = 0;
while (root != null) {
switch (index) {
case 0:
case 1:
case 2:
Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
root.getClass().getName());
break;
case 3:
Assert.assertEquals(MockRequestInterceptor.class.getName(), root
.getClass().getName());
break;
}
root = root.getNextInterceptor();
index++;
}
Assert.assertEquals(
"The number of interceptors in chain does not match",
Integer.toString(4), Integer.toString(index));
}
/**
* Tests registration of a single application master.
*
* @throws Exception
*/
@Test
public void testRegisterOneApplicationMaster() throws Exception {
// The testAppId identifier is used as host name and the mock resource
// manager return it as the queue name. Assert that we received the queue
// name
int testAppId = 1;
RegisterApplicationMasterResponse response1 =
registerApplicationMaster(testAppId);
Assert.assertNotNull(response1);
Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
}
/**
* Tests the case when interceptor pipeline initialization fails.
*
* @throws IOException
*/
@Test
public void testInterceptorInitFailure() throws IOException {
Configuration conf = this.getConf();
// Override with a bad interceptor configuration
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
"class.that.does.not.exist");
// Reinitialize instance with the new config
createAndStartAMRMProxyService(conf);
int testAppId = 1;
try {
registerApplicationMaster(testAppId);
Assert.fail("Should not reach here. Expecting an exception thrown");
} catch (Exception e) {
Map<ApplicationId, RequestInterceptorChainWrapper> pipelines =
getAMRMProxyService().getPipelines();
ApplicationId id = getApplicationId(testAppId);
Assert.assertTrue(
"The interceptor pipeline should be removed if initializtion fails",
pipelines.get(id) == null);
}
}
/**
* Tests the registration of multiple application master serially one at a
* time.
*
* @throws Exception
*/
@Test
public void testRegisterMulitpleApplicationMasters() throws Exception {
for (int testAppId = 0; testAppId < 3; testAppId++) {
RegisterApplicationMasterResponse response =
registerApplicationMaster(testAppId);
Assert.assertNotNull(response);
Assert
.assertEquals(Integer.toString(testAppId), response.getQueue());
}
}
/**
* Tests the registration of multiple application masters using multiple
* threads in parallel.
*
* @throws Exception
*/
@Test
public void testRegisterMulitpleApplicationMastersInParallel()
throws Exception {
int numberOfRequests = 5;
ArrayList<String> testContexts =
CreateTestRequestIdentifiers(numberOfRequests);
super.registerApplicationMastersInParallel(testContexts);
}
private ArrayList<String> CreateTestRequestIdentifiers(
int numberOfRequests) {
ArrayList<String> testContexts = new ArrayList<String>();
LOG.info("Creating " + numberOfRequests + " contexts for testing");
for (int ep = 0; ep < numberOfRequests; ep++) {
testContexts.add("test-endpoint-" + Integer.toString(ep));
LOG.info("Created test context: " + testContexts.get(ep));
}
return testContexts;
}
@Test
public void testFinishOneApplicationMasterWithSuccess() throws Exception {
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId),
registerResponse.getQueue());
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId,
FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
}
@Test
public void testFinishOneApplicationMasterWithFailure() throws Exception {
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId),
registerResponse.getQueue());
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
Assert.assertNotNull(finshResponse);
try {
// Try to finish an application master that is already finished.
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The request to finish application master should have failed");
} catch (Throwable ex) {
// This is expected. So nothing required here.
LOG.info("Finish registration failed as expected because it was not registered");
}
}
@Test
public void testFinishInvalidApplicationMaster() throws Exception {
try {
// Try to finish an application master that was not registered.
finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The request to finish application master should have failed");
} catch (Throwable ex) {
// This is expected. So nothing required here.
LOG.info("Finish registration failed as expected because it was not registered");
}
}
@Test
public void testFinishMulitpleApplicationMasters() throws Exception {
int numberOfRequests = 3;
for (int index = 0; index < numberOfRequests; index++) {
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(index);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(index),
registerResponse.getQueue());
}
// Finish in reverse sequence
for (int index = numberOfRequests - 1; index >= 0; index--) {
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
// Assert that the application has been removed from the collection
Assert.assertTrue(this.getAMRMProxyService()
.getPipelines().size() == index);
}
try {
// Try to finish an application master that is already finished.
finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The request to finish application master should have failed");
} catch (Throwable ex) {
// This is expected. So nothing required here.
LOG.info("Finish registration failed as expected because it was not registered");
}
try {
// Try to finish an application master that was not registered.
finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The request to finish application master should have failed");
} catch (Throwable ex) {
// This is expected. So nothing required here.
LOG.info("Finish registration failed as expected because it was not registered");
}
}
@Test
public void testFinishMulitpleApplicationMastersInParallel()
throws Exception {
int numberOfRequests = 5;
ArrayList<String> testContexts = new ArrayList<String>();
LOG.info("Creating " + numberOfRequests + " contexts for testing");
for (int i = 0; i < numberOfRequests; i++) {
testContexts.add("test-endpoint-" + Integer.toString(i));
LOG.info("Created test context: " + testContexts.get(i));
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(i);
Assert.assertNotNull(registerResponse);
Assert
.assertEquals(Integer.toString(i), registerResponse.getQueue());
}
finishApplicationMastersInParallel(testContexts);
}
@Test
public void testAllocateRequestWithNullValues() throws Exception {
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId),
registerResponse.getQueue());
AllocateResponse allocateResponse = allocate(testAppId);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId,
FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
}
@Test
public void testAllocateRequestWithoutRegistering() throws Exception {
try {
// Try to allocate an application master without registering.
allocate(1);
Assert
.fail("The request to allocate application master should have failed");
} catch (Throwable ex) {
// This is expected. So nothing required here.
LOG.info("AllocateRequest failed as expected because AM was not registered");
}
}
@Test
public void testAllocateWithOneResourceRequest() throws Exception {
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
getContainersAndAssert(testAppId, 1);
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
}
@Test
public void testAllocateWithMultipleResourceRequest() throws Exception {
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
getContainersAndAssert(testAppId, 10);
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
}
@Test
public void testAllocateAndReleaseContainers() throws Exception {
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
List<Container> containers = getContainersAndAssert(testAppId, 10);
releaseContainersAndAssert(testAppId, containers);
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
}
@Test
public void testAllocateAndReleaseContainersForMultipleAM()
throws Exception {
int numberOfApps = 5;
for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
List<Container> containers = getContainersAndAssert(testAppId, 10);
releaseContainersAndAssert(testAppId, containers);
}
for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
}
}
@Test
public void testAllocateAndReleaseContainersForMultipleAMInParallel()
throws Exception {
int numberOfApps = 6;
ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
for (int i = 0; i < numberOfApps; i++) {
tempAppIds.add(new Integer(i));
}
final ArrayList<Integer> appIds = tempAppIds;
List<Integer> responses =
runInParallel(appIds, new Function<Integer, Integer>() {
@Override
public Integer invoke(Integer testAppId) {
try {
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull("response is null", registerResponse);
List<Container> containers =
getContainersAndAssert(testAppId, 10);
releaseContainersAndAssert(testAppId, containers);
LOG.info("Sucessfully registered application master with appId: "
+ testAppId);
} catch (Throwable ex) {
LOG.error(
"Failed to register application master with appId: "
+ testAppId, ex);
testAppId = null;
}
return testAppId;
}
});
Assert.assertEquals(
"Number of responses received does not match with request",
appIds.size(), responses.size());
for (Integer testAppId : responses) {
Assert.assertNotNull(testAppId);
finishApplicationMaster(testAppId.intValue(),
FinalApplicationStatus.SUCCEEDED);
}
}
@Test
public void testMultipleAttemptsSameNode()
throws YarnException, IOException, Exception {
String user = "hadoop";
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId;
// First Attempt
RegisterApplicationMasterResponse response1 =
registerApplicationMaster(appId.getId());
Assert.assertNotNull(response1);
AllocateResponse allocateResponse = allocate(appId.getId());
Assert.assertNotNull(allocateResponse);
// Second Attempt
applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
getAMRMProxyService().initializePipeline(applicationAttemptId, user,
new Token<AMRMTokenIdentifier>(), null, null, false, null);
RequestInterceptorChainWrapper chain2 =
getAMRMProxyService().getPipelines().get(appId);
Assert.assertEquals(applicationAttemptId, chain2.getApplicationAttemptId());
allocateResponse = allocate(appId.getId());
Assert.assertNotNull(allocateResponse);
}
private List<Container> getContainersAndAssert(int appId,
int numberOfResourceRequests) throws Exception {
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList =
new ArrayList<ResourceRequest>(numberOfResourceRequests);
for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
askList.add(createResourceRequest(
"test-node-" + Integer.toString(testAppId), 6000, 2,
testAppId % 5, 1));
}
allocateRequest.setAskList(askList);
AllocateResponse allocateResponse = allocate(appId, allocateRequest);
Assert.assertNotNull("allocate() returned null response",
allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
containers.addAll(allocateResponse.getAllocatedContainers());
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containers.size() < askList.size() && numHeartbeat++ < 10) {
allocateResponse =
allocate(appId, Records.newRecord(AllocateRequest.class));
Assert.assertNotNull("allocate() returned null response",
allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers()
.size()));
LOG.info("Total number of allocated containers: "
+ Integer.toString(containers.size()));
Thread.sleep(10);
}
// We broadcast the request, the number of containers we received will be
// higher than we ask
Assert.assertTrue("The asklist count is not same as response",
askList.size() <= containers.size());
return containers;
}
private void releaseContainersAndAssert(int appId,
List<Container> containers) throws Exception {
Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<ContainerId> relList =
new ArrayList<ContainerId>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
}
allocateRequest.setReleaseList(relList);
AllocateResponse allocateResponse = allocate(appId, allocateRequest);
Assert.assertNotNull(allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
// We need to make sure all the resource managers received the
// release list. The containers sent by the mock resource managers will be
// aggregated and returned back to us and we can assert if all the release
// lists reached the sub-clusters
List<ContainerId> containersForReleasedContainerIds = new ArrayList<>();
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containersForReleasedContainerIds.size() < relList.size()
&& numHeartbeat++ < 10) {
allocateResponse =
allocate(appId, Records.newRecord(AllocateRequest.class));
Assert.assertNotNull(allocateResponse);
Assert.assertNull(
"new AMRMToken from RM should have been nulled by AMRMProxyService",
allocateResponse.getAMRMToken());
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers()
.size()));
LOG.info("Total number of containers received: "
+ Integer.toString(containersForReleasedContainerIds.size()));
Thread.sleep(10);
}
Assert.assertEquals(relList.size(),
containersForReleasedContainerIds.size());
}
/**
* Test AMRMProxy restart with recovery.
*/
@Test
public void testRecovery() throws YarnException, Exception {
Configuration conf = createConfiguration();
// Use the MockRequestInterceptorAcrossRestart instead for the chain
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
MockRequestInterceptorAcrossRestart.class.getName());
mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0);
createAndStartAMRMProxyService(conf);
int testAppId1 = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId1);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId1),
registerResponse.getQueue());
int testAppId2 = 2;
registerResponse = registerApplicationMaster(testAppId2);
Assert.assertNotNull(registerResponse);
Assert.assertEquals(Integer.toString(testAppId2),
registerResponse.getQueue());
AllocateResponse allocateResponse = allocate(testAppId2);
Assert.assertNotNull(allocateResponse);
// At the time of kill, app1 just registerAM, app2 already did one allocate.
// Both application should be recovered
createAndStartAMRMProxyService(conf);
Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2);
allocateResponse = allocate(testAppId1);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
allocateResponse = allocate(testAppId2);
Assert.assertNotNull(allocateResponse);
finshResponse =
finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
int testAppId3 = 3;
try {
// Try to finish an application master that is not registered.
finishApplicationMaster(testAppId3, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The Mock RM should complain about not knowing the third app");
} catch (Throwable ex) {
}
mockRM = null;
}
/**
* Test AMRMProxy restart with application recovery failure.
*/
@Test
public void testAppRecoveryFailure() throws YarnException, Exception {
Configuration conf = createConfiguration();
// Use the MockRequestInterceptorAcrossRestart instead for the chain
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
BadRequestInterceptorAcrossRestart.class.getName());
mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0);
createAndStartAMRMProxyService(conf);
// Create an app entry in NMSS
registerApplicationMaster(1);
RecoveredAMRMProxyState state =
getNMContext().getNMStateStore().loadAMRMProxyState();
Assert.assertEquals(1, state.getAppContexts().size());
// AMRMProxy restarts and recover
createAndStartAMRMProxyService(conf);
state = getNMContext().getNMStateStore().loadAMRMProxyState();
// The app that failed to recover should have been removed from NMSS
Assert.assertEquals(0, state.getAppContexts().size());
}
/**
* A mock intercepter implementation that uses the same mockRM instance across
* restart.
*/
public static class MockRequestInterceptorAcrossRestart
extends AbstractRequestInterceptor {
public MockRequestInterceptorAcrossRestart() {
}
@Override
public void init(AMRMProxyApplicationContext appContext) {
super.init(appContext);
if (mockRM == null) {
throw new RuntimeException("mockRM not initialized yet");
}
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
return mockRM.registerApplicationMaster(request);
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
return mockRM.finishApplicationMaster(request);
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
return mockRM.allocate(request);
}
}
/**
* A mock intercepter implementation that throws when recovering.
*/
public static class BadRequestInterceptorAcrossRestart
extends MockRequestInterceptorAcrossRestart {
@Override
public void recover(Map<String, byte[]> recoveredDataMap) {
throw new RuntimeException("Kaboom");
}
}
}