blob: f98d90a6f340b187f73c7123f479edbb0d4f9aab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED;
/**
* End to end tests to test deploying services with MiniYarnCluster and a in-JVM
* ZK testing cluster.
*/
public class TestYarnNativeServices extends ServiceTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(TestYarnNativeServices.class);
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Before
public void setup() throws Exception {
File tmpYarnDir = new File("target", "tmp");
FileUtils.deleteQuietly(tmpYarnDir);
}
@After
public void tearDown() throws IOException {
shutdown();
}
// End-to-end test to use ServiceClient to deploy a service.
// 1. Create a service with 2 components, each of which has 2 containers
// 2. Flex up each component to 3 containers and check the component instance names
// 3. Flex down each component to 1 container and check the component instance names
// 4. Flex up each component to 2 containers and check the component instance names
// 5. Stop the service
// 6. Destroy the service
@Test (timeout = 200000)
public void testCreateFlexStopDestroyService() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient();
Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
SliderFileSystem fileSystem = new SliderFileSystem(getConf());
Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName());
// check app.json is persisted.
Assert.assertTrue(
getFS().exists(new Path(appDir, exampleApp.getName() + ".json")));
waitForAllCompToBeReady(client, exampleApp);
// Flex two components, each from 2 container to 3 containers.
flexComponents(client, exampleApp, 3L);
// wait for flex to be completed, increase from 2 to 3 containers.
waitForAllCompToBeReady(client, exampleApp);
// check all instances name for each component are in sequential order.
checkCompInstancesInOrder(client, exampleApp);
// flex down to 1
flexComponents(client, exampleApp, 1L);
waitForAllCompToBeReady(client, exampleApp);
checkCompInstancesInOrder(client, exampleApp);
// check component dir and registry are cleaned up.
// flex up again to 2
flexComponents(client, exampleApp, 2L);
waitForAllCompToBeReady(client, exampleApp);
checkCompInstancesInOrder(client, exampleApp);
// stop the service
LOG.info("Stop the service");
client.actionStop(exampleApp.getName(), true);
ApplicationReport report = client.getYarnClient()
.getApplicationReport(ApplicationId.fromString(exampleApp.getId()));
// AM unregisters with RM successfully
Assert.assertEquals(FINISHED, report.getYarnApplicationState());
Assert.assertEquals(FinalApplicationStatus.ENDED,
report.getFinalApplicationStatus());
LOG.info("Destroy the service");
//destroy the service and check the app dir is deleted from fs.
client.actionDestroy(exampleApp.getName());
// check the service dir on hdfs (in this case, local fs) are deleted.
Assert.assertFalse(getFS().exists(appDir));
}
// Create compa with 2 containers
// Create compb with 2 containers which depends on compa
// Check containers for compa started before containers for compb
@Test (timeout = 200000)
public void testComponentStartOrder() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient();
Service exampleApp = new Service();
exampleApp.setName("teststartorder");
exampleApp.addComponent(createComponent("compa", 2, "sleep 1000"));
Component compb = createComponent("compb", 2, "sleep 1000");
// Let compb depedends on compa;
compb.setDependencies(Collections.singletonList("compa"));
exampleApp.addComponent(compb);
client.actionCreate(exampleApp);
waitForAllCompToBeReady(client, exampleApp);
// check that containers for compa are launched before containers for compb
checkContainerLaunchDependencies(client, exampleApp, "compa", "compb");
client.actionStop(exampleApp.getName(), true);
client.actionDestroy(exampleApp.getName());
}
// Test to verify recovery of SeviceMaster after RM is restarted.
// 1. Create an example service.
// 2. Restart RM.
// 3. Fail the application attempt.
// 4. Verify ServiceMaster recovers.
@Test(timeout = 200000)
public void testRecoverComponentsAfterRMRestart() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
500L);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
setConf(conf);
setupInternal(NUM_NMS);
ServiceClient client = createClient();
Service exampleApp = createExampleApplication();
client.actionCreate(exampleApp);
waitForAllCompToBeReady(client, exampleApp);
LOG.info("Restart the resource manager");
getYarnCluster().restartResourceManager(
getYarnCluster().getActiveRMIndex());
GenericTestUtils.waitFor(() ->
getYarnCluster().getResourceManager().getServiceState() ==
org.apache.hadoop.service.Service.STATE.STARTED, 2000, 200000);
Assert.assertTrue("node managers connected",
getYarnCluster().waitForNodeManagersToConnect(5000));
ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId());
ApplicationAttemptId applicationAttemptId = client.getYarnClient()
.getApplicationReport(exampleAppId).getCurrentApplicationAttemptId();
Multimap<String, String> containersBeforeFailure = getContainersForAllComp(
client, exampleApp);
LOG.info("Fail the application attempt {}", applicationAttemptId);
client.getYarnClient().failApplicationAttempt(applicationAttemptId);
//wait until attempt 2 is running
GenericTestUtils.waitFor(() -> {
try {
ApplicationReport ar = client.getYarnClient()
.getApplicationReport(exampleAppId);
return ar.getCurrentApplicationAttemptId().getAttemptId() == 2 &&
ar.getYarnApplicationState() == YarnApplicationState.RUNNING;
} catch (YarnException | IOException e) {
throw new RuntimeException("while waiting", e);
}
}, 2000, 200000);
Multimap<String, String> containersAfterFailure = getContainersForAllComp(
client, exampleApp);
Assert.assertEquals("component container affected by restart",
containersBeforeFailure, containersAfterFailure);
LOG.info("Stop/destroy service {}", exampleApp);
client.actionStop(exampleApp.getName(), true);
client.actionDestroy(exampleApp.getName());
}
// Check containers launched are in dependency order
// Get all containers into a list and sort based on container launch time e.g.
// compa-c1, compa-c2, compb-c1, compb-c2;
// check that the container's launch time are align with the dependencies.
private void checkContainerLaunchDependencies(ServiceClient client,
Service exampleApp, String... compOrder)
throws IOException, YarnException {
Service retrievedApp = client.getStatus(exampleApp.getName());
List<Container> containerList = new ArrayList<>();
for (Component component : retrievedApp.getComponents()) {
containerList.addAll(component.getContainers());
}
// sort based on launchTime
containerList
.sort((o1, o2) -> o1.getLaunchTime().compareTo(o2.getLaunchTime()));
LOG.info("containerList: " + containerList);
// check the containers are in the dependency order.
int index = 0;
for (String comp : compOrder) {
long num = retrievedApp.getComponent(comp).getNumberOfContainers();
for (int i = 0; i < num; i++) {
String compInstanceName = containerList.get(index).getComponentInstanceName();
String compName =
compInstanceName.substring(0, compInstanceName.lastIndexOf('-'));
Assert.assertEquals(comp, compName);
index++;
}
}
}
private Map<String, Long> flexComponents(ServiceClient client,
Service exampleApp, long count) throws YarnException, IOException {
Map<String, Long> compCounts = new HashMap<>();
compCounts.put("compa", count);
compCounts.put("compb", count);
// flex will update the persisted conf to reflect latest number of containers.
exampleApp.getComponent("compa").setNumberOfContainers(count);
exampleApp.getComponent("compb").setNumberOfContainers(count);
client.flexByRestService(exampleApp.getName(), compCounts);
return compCounts;
}
// Check each component's comp instances name are in sequential order.
// E.g. If there are two instances compA-1 and compA-2
// When flex up to 4 instances, it should be compA-1 , compA-2, compA-3, compA-4
// When flex down to 3 instances, it should be compA-1 , compA-2, compA-3.
private void checkCompInstancesInOrder(ServiceClient client,
Service exampleApp) throws IOException, YarnException {
Service service = client.getStatus(exampleApp.getName());
for (Component comp : service.getComponents()) {
checkEachCompInstancesInOrder(comp);
}
}
private void checkRegistryAndCompDirDeleted() {
}
private void checkEachCompInstancesInOrder(Component component) {
long expectedNumInstances = component.getNumberOfContainers();
Assert.assertEquals(expectedNumInstances, component.getContainers().size());
TreeSet<String> instances = new TreeSet<>();
for (Container container : component.getContainers()) {
instances.add(container.getComponentInstanceName());
}
int i = 0;
for (String s : instances) {
Assert.assertEquals(component.getName() + "-" + i, s);
i++;
}
}
private void waitForOneCompToBeReady(ServiceClient client,
Service exampleApp, String readyComp)
throws TimeoutException, InterruptedException {
long numExpectedContainers =
exampleApp.getComponent(readyComp).getNumberOfContainers();
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
Component retrievedComp = retrievedApp.getComponent(readyComp);
if (retrievedComp.getContainers() != null
&& retrievedComp.getContainers().size() == numExpectedContainers) {
LOG.info(readyComp + " found " + numExpectedContainers
+ " containers running");
return true;
} else {
LOG.info(" Waiting for " + readyComp + "'s containers to be running");
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
}
// wait until all the containers for all components become ready state
private void waitForAllCompToBeReady(ServiceClient client,
Service exampleApp) throws TimeoutException, InterruptedException {
int expectedTotalContainers = countTotalContainers(exampleApp);
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
int totalReadyContainers = 0;
LOG.info("Num Components " + retrievedApp.getComponents().size());
for (Component component : retrievedApp.getComponents()) {
LOG.info("looking for " + component.getName());
LOG.info(component.toString());
if (component.getContainers() != null) {
if (component.getContainers().size() == exampleApp
.getComponent(component.getName()).getNumberOfContainers()) {
for (Container container : component.getContainers()) {
LOG.info(
"Container state " + container.getState() + ", component "
+ component.getName());
if (container.getState() == ContainerState.READY) {
totalReadyContainers++;
LOG.info("Found 1 ready container " + container.getId());
}
}
} else {
LOG.info(component.getName() + " Expected number of containers "
+ exampleApp.getComponent(component.getName())
.getNumberOfContainers() + ", current = " + component
.getContainers());
}
}
}
LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers
+ " expected = " + expectedTotalContainers);
return totalReadyContainers == expectedTotalContainers;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}, 2000, 200000);
}
/**
* Get all containers of a service.
*/
private Multimap<String, String> getContainersForAllComp(ServiceClient client,
Service example) throws IOException, YarnException {
Multimap<String, String> allContainers = HashMultimap.create();
Service retrievedApp = client.getStatus(example.getName());
retrievedApp.getComponents().forEach(component -> {
if (component.getContainers() != null) {
component.getContainers().forEach(container -> {
allContainers.put(component.getName(), container.getId());
});
}
});
return allContainers;
}
private ServiceClient createClient() throws Exception {
ServiceClient client = new ServiceClient() {
@Override protected Path addJarResource(String appName,
Map<String, LocalResource> localResources)
throws IOException, SliderException {
// do nothing, the Unit test will use local jars
return null;
}
};
client.init(getConf());
client.start();
return client;
}
private int countTotalContainers(Service service) {
int totalContainers = 0;
for (Component component : service.getComponents()) {
totalContainers += component.getNumberOfContainers();
}
return totalContainers;
}
}