blob: 87dfc95cd9e9200c77db25d99bf73ee01ceca673 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code FederationInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request intercepter chains.
*/
public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationClientInterceptor.class);
private TestableFederationClientInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private List<SubClusterId> subClusters;
private String user = "test-user";
private final static int NUM_SUBCLUSTER = 4;
@Override
public void setUp() {
super.setUpConfig();
interceptor = new TestableFederationClientInterceptor();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
subClusters = new ArrayList<SubClusterId>();
try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
stateStoreUtil.registerSubCluster(sc);
subClusters.add(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
/**
* This test validates the correctness of GetNewApplication. The return
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
public void testGetNewApplication()
throws YarnException, IOException, InterruptedException {
System.out.println("Test FederationClientInterceptor: Get New Application");
GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
GetNewApplicationResponse response = interceptor.getNewApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(response.getApplicationId());
Assert.assertTrue(
response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER);
Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0);
}
/**
* This test validates the correctness of SubmitApplication. The application
* has to be submitted to one of the SubCluster in the cluster.
*/
@Test
public void testSubmitApplication()
throws YarnException, IOException, InterruptedException {
System.out.println("Test FederationClientInterceptor: Submit Application");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult);
Assert.assertTrue(subClusters.contains(scIdResult));
}
/**
* This test validates the correctness of SubmitApplication in case of
* multiple submission. The first retry has to be submitted to the same
* SubCluster of the first attempt.
*/
@Test
public void testSubmitApplicationMultipleSubmission()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Submit Application - Multiple");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
// First attempt
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult);
// First retry
response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult2);
Assert.assertEquals(scIdResult, scIdResult);
}
/**
* This test validates the correctness of SubmitApplication in case of empty
* request.
*/
@Test
public void testSubmitApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Submit Application - Empty");
try {
interceptor.submitApplication(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContex information."));
}
try {
interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContex information."));
}
try {
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(null, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
interceptor.submitApplication(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContex information."));
}
}
/**
* This test validates the correctness of ForceKillApplication in case the
* application exists in the cluster.
*/
@Test
public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {
System.out
.println("Test FederationClientInterceptor: Force Kill Application");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
// Submit the application we are going to kill later
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId);
KillApplicationResponse responseKill =
interceptor.forceKillApplication(requestKill);
Assert.assertNotNull(responseKill);
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application does not exist in StateStore.
*/
@Test
public void testForceKillApplicationNotExists()
throws YarnException, IOException, InterruptedException {
System.out.println("Test FederationClientInterceptor: "
+ "Force Kill Application - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId);
try {
interceptor.forceKillApplication(requestKill);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().equals(
"Application " + appId + " does not exist in FederationStateStore"));
}
}
/**
* This test validates the correctness of ForceKillApplication in case of
* empty request.
*/
@Test
public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Force Kill Application - Empty");
try {
interceptor.forceKillApplication(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Missing forceKillApplication request or ApplicationId."));
}
try {
interceptor
.forceKillApplication(KillApplicationRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Missing forceKillApplication request or ApplicationId."));
}
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationReport()
throws YarnException, IOException, InterruptedException {
System.out
.println("Test FederationClientInterceptor: Get Application Report");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
// Submit the application we want the report later
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
GetApplicationReportRequest requestGet =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse responseGet =
interceptor.getApplicationReport(requestGet);
Assert.assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationNotExists()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test ApplicationClientProtocol: Get Application Report - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
GetApplicationReportRequest requestGet =
GetApplicationReportRequest.newInstance(appId);
try {
interceptor.getApplicationReport(requestGet);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().equals(
"Application " + appId + " does not exist in FederationStateStore"));
}
}
/**
* This test validates the correctness of GetApplicationReport in case of
* empty request.
*/
@Test
public void testGetApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Get Application Report - Empty");
try {
interceptor.getApplicationReport(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing getApplicationReport request or "
+ "applicationId information."));
}
try {
interceptor
.getApplicationReport(GetApplicationReportRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing getApplicationReport request or "
+ "applicationId information."));
}
}
}