blob: e7b28b2f416286a650f797d3876bc84ae5301781 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterWebServicesTest} and overrides methods in order
* to use the {@code RouterWebServices} pipeline test cases for testing the
* {@code FederationInterceptorREST} class. The tests for
* {@code RouterWebServices} has been written cleverly so that it can be reused
* to validate different request interceptor chains.
* <p>
* It tests the case with SubClusters down and the Router logic of retries. We
* have 1 good SubCluster and 2 bad ones for all the tests.
*/
public class TestFederationInterceptorRESTRetry
extends BaseRouterWebServicesTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationInterceptorRESTRetry.class);
private static final int SERVICE_UNAVAILABLE = 503;
private static final int ACCEPTED = 202;
private static final int OK = 200;
// running and registered
private static SubClusterId good;
// registered but not running
private static SubClusterId bad1;
private static SubClusterId bad2;
private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
private TestableFederationInterceptorREST interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private String user = "test-user";
@Override
public void setUp() {
super.setUpConfig();
interceptor = new TestableFederationInterceptorREST();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
// Create SubClusters
good = SubClusterId.newInstance("1");
bad1 = SubClusterId.newInstance("2");
bad2 = SubClusterId.newInstance("3");
scs.add(good);
scs.add(bad1);
scs.add(bad2);
// The mock RM will not start in these SubClusters, this is done to simulate
// a SubCluster down
interceptor.registerBadSubCluster(bad1);
interceptor.registerBadSubCluster(bad2);
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
private void setupCluster(List<SubClusterId> scsToRegister)
throws YarnException {
try {
// Clean up the StateStore before every test
stateStoreUtil.deregisterAllSubClusters();
for (SubClusterId sc : scsToRegister) {
stateStoreUtil.registerSubCluster(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
MockDefaultRequestInterceptorREST.class.getName());
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + ","
+ TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 1 bad SubCluster.
*/
@Test
public void testGetNewApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 2 bad SubClusters.
*/
@Test
public void testGetNewApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 1 bad SubCluster and 1 good one.
*/
@Test
public void testGetNewApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2));
Response response = interceptor.createNewApplication(null);
Assert.assertEquals(OK, response.getStatus());
NewApplication newApp = (NewApplication) response.getEntity();
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
Assert.assertEquals(Integer.parseInt(good.getId()),
appId.getClusterTimestamp());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 1 bad SubCluster.
*/
@Test
public void testSubmitApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 2 bad SubClusters.
*/
@Test
public void testSubmitApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
response.getEntity());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testSubmitApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2));
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContextInfo context =
new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
Response response = interceptor.submitApplication(context, null);
Assert.assertEquals(ACCEPTED, response.getStatus());
Assert.assertEquals(good,
stateStore
.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId))
.getApplicationHomeSubCluster().getHomeSubCluster());
}
/**
* This test validates the correctness of GetApps in case the cluster is
* composed of only 1 bad SubCluster.
*/
@Test
public void testGetAppsOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
Assert.assertNull(response);
}
/**
* This test validates the correctness of GetApps in case the cluster is
* composed of only 2 bad SubClusters.
*/
@Test
public void testGetAppsTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
Assert.assertNull(response);
}
/**
* This test validates the correctness of GetApps in case the cluster is
* composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testGetAppsOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
null, null, null, null, null, null, null, null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getApps().size());
}
/**
* This test validates the correctness of GetNode in case the cluster is
* composed of only 1 bad SubCluster.
*/
@Test
public void testGetNodeOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
try {
interceptor.getNode("testGetNodeOneBadSC");
Assert.fail();
} catch (NotFoundException e) {
Assert.assertTrue(
e.getMessage().contains("nodeId, testGetNodeOneBadSC, is not found"));
}
}
/**
* This test validates the correctness of GetNode in case the cluster is
* composed of only 2 bad SubClusters.
*/
@Test
public void testGetNodeTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
try {
interceptor.getNode("testGetNodeTwoBadSCs");
Assert.fail();
} catch (NotFoundException e) {
Assert.assertTrue(e.getMessage()
.contains("nodeId, testGetNodeTwoBadSCs, is not found"));
}
}
/**
* This test validates the correctness of GetNode in case the cluster is
* composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testGetNodeOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
NodeInfo response = interceptor.getNode(null);
Assert.assertNotNull(response);
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getLastHealthUpdate()));
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 1 bad SubCluster.
*/
@Test
public void testGetNodesOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getNodes().size());
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 2 bad SubClusters.
*/
@Test
public void testGetNodesTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getNodes().size());
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testGetNodesOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getNodes().size());
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
// The remove duplicate operations is tested in TestRouterWebServiceUtil
}
/**
* This test validates the correctness of GetNodes in case the cluster is
* composed of only 1 bad SubCluster. The excepted result would be a
* ClusterMetricsInfo with all its values set to 0.
*/
@Test
public void testGetClusterMetricsOneBadSC()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad2));
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(response);
// check if we got an empty metrics
checkEmptyMetrics(response);
}
/**
* This test validates the correctness of GetClusterMetrics in case the
* cluster is composed of only 2 bad SubClusters. The excepted result would be
* a ClusterMetricsInfo with all its values set to 0.
*/
@Test
public void testGetClusterMetricsTwoBadSCs()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(bad1, bad2));
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(response);
// check if we got an empty metrics
Assert.assertEquals(0, response.getAppsSubmitted());
}
/**
* This test validates the correctness of GetClusterMetrics in case the
* cluster is composed of only 1 bad SubCluster and a good one. The good
* SubCluster provided a ClusterMetricsInfo with appsSubmitted set to its
* SubClusterId. The expected result would be appSubmitted equals to its
* SubClusterId. SubClusterId in this case is an integer.
*/
@Test
public void testGetClusterMetricsOneBadOneGood()
throws YarnException, IOException, InterruptedException {
setupCluster(Arrays.asList(good, bad2));
ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
Assert.assertNotNull(response);
checkMetricsFromGoodSC(response);
// The merge operations is tested in TestRouterWebServiceUtil
}
private void checkMetricsFromGoodSC(ClusterMetricsInfo response) {
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsSubmitted());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsCompleted());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsPending());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsRunning());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsFailed());
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getAppsKilled());
}
private void checkEmptyMetrics(ClusterMetricsInfo response) {
Assert.assertEquals(0, response.getAppsSubmitted());
Assert.assertEquals(0, response.getAppsCompleted());
Assert.assertEquals(0, response.getAppsPending());
Assert.assertEquals(0, response.getAppsRunning());
Assert.assertEquals(0, response.getAppsFailed());
Assert.assertEquals(0, response.getAppsKilled());
Assert.assertEquals(0, response.getReservedMB());
Assert.assertEquals(0, response.getAvailableMB());
Assert.assertEquals(0, response.getAllocatedMB());
Assert.assertEquals(0, response.getReservedVirtualCores());
Assert.assertEquals(0, response.getAvailableVirtualCores());
Assert.assertEquals(0, response.getAllocatedVirtualCores());
Assert.assertEquals(0, response.getContainersAllocated());
Assert.assertEquals(0, response.getReservedContainers());
Assert.assertEquals(0, response.getPendingContainers());
Assert.assertEquals(0, response.getTotalMB());
Assert.assertEquals(0, response.getTotalVirtualCores());
Assert.assertEquals(0, response.getTotalNodes());
Assert.assertEquals(0, response.getLostNodes());
Assert.assertEquals(0, response.getUnhealthyNodes());
Assert.assertEquals(0, response.getDecommissioningNodes());
Assert.assertEquals(0, response.getDecommissionedNodes());
Assert.assertEquals(0, response.getRebootedNodes());
Assert.assertEquals(0, response.getActiveNodes());
Assert.assertEquals(0, response.getShutdownNodes());
}
}