| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.service; |
| |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.Multimap; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.yarn.api.records.*; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.service.api.records.Service; |
| import org.apache.hadoop.yarn.service.api.records.Component; |
| import org.apache.hadoop.yarn.service.api.records.Container; |
| import org.apache.hadoop.yarn.service.api.records.ContainerState; |
| import org.apache.hadoop.yarn.service.client.ServiceClient; |
| import org.apache.hadoop.yarn.service.exceptions.SliderException; |
| import org.apache.hadoop.yarn.service.utils.SliderFileSystem; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.*; |
| import java.util.concurrent.TimeoutException; |
| |
| import static org.apache.hadoop.yarn.api.records.YarnApplicationState.FINISHED; |
| |
| /** |
| * End to end tests to test deploying services with MiniYarnCluster and a in-JVM |
| * ZK testing cluster. |
| */ |
| public class TestYarnNativeServices extends ServiceTestUtils { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestYarnNativeServices.class); |
| |
| @Rule |
| public TemporaryFolder tmpFolder = new TemporaryFolder(); |
| |
| @Before |
| public void setup() throws Exception { |
| File tmpYarnDir = new File("target", "tmp"); |
| FileUtils.deleteQuietly(tmpYarnDir); |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| shutdown(); |
| } |
| |
| // End-to-end test to use ServiceClient to deploy a service. |
| // 1. Create a service with 2 components, each of which has 2 containers |
| // 2. Flex up each component to 3 containers and check the component instance names |
| // 3. Flex down each component to 1 container and check the component instance names |
| // 4. Flex up each component to 2 containers and check the component instance names |
| // 5. Stop the service |
| // 6. Destroy the service |
| @Test (timeout = 200000) |
| public void testCreateFlexStopDestroyService() throws Exception { |
| setupInternal(NUM_NMS); |
| ServiceClient client = createClient(); |
| Service exampleApp = createExampleApplication(); |
| client.actionCreate(exampleApp); |
| SliderFileSystem fileSystem = new SliderFileSystem(getConf()); |
| Path appDir = fileSystem.buildClusterDirPath(exampleApp.getName()); |
| // check app.json is persisted. |
| Assert.assertTrue( |
| getFS().exists(new Path(appDir, exampleApp.getName() + ".json"))); |
| waitForAllCompToBeReady(client, exampleApp); |
| |
| // Flex two components, each from 2 container to 3 containers. |
| flexComponents(client, exampleApp, 3L); |
| // wait for flex to be completed, increase from 2 to 3 containers. |
| waitForAllCompToBeReady(client, exampleApp); |
| // check all instances name for each component are in sequential order. |
| checkCompInstancesInOrder(client, exampleApp); |
| |
| // flex down to 1 |
| flexComponents(client, exampleApp, 1L); |
| waitForAllCompToBeReady(client, exampleApp); |
| checkCompInstancesInOrder(client, exampleApp); |
| |
| // check component dir and registry are cleaned up. |
| |
| // flex up again to 2 |
| flexComponents(client, exampleApp, 2L); |
| waitForAllCompToBeReady(client, exampleApp); |
| checkCompInstancesInOrder(client, exampleApp); |
| |
| // stop the service |
| LOG.info("Stop the service"); |
| client.actionStop(exampleApp.getName(), true); |
| ApplicationReport report = client.getYarnClient() |
| .getApplicationReport(ApplicationId.fromString(exampleApp.getId())); |
| // AM unregisters with RM successfully |
| Assert.assertEquals(FINISHED, report.getYarnApplicationState()); |
| Assert.assertEquals(FinalApplicationStatus.ENDED, |
| report.getFinalApplicationStatus()); |
| |
| LOG.info("Destroy the service"); |
| //destroy the service and check the app dir is deleted from fs. |
| client.actionDestroy(exampleApp.getName()); |
| // check the service dir on hdfs (in this case, local fs) are deleted. |
| Assert.assertFalse(getFS().exists(appDir)); |
| } |
| |
| // Create compa with 2 containers |
| // Create compb with 2 containers which depends on compa |
| // Check containers for compa started before containers for compb |
| @Test (timeout = 200000) |
| public void testComponentStartOrder() throws Exception { |
| setupInternal(NUM_NMS); |
| ServiceClient client = createClient(); |
| Service exampleApp = new Service(); |
| exampleApp.setName("teststartorder"); |
| exampleApp.addComponent(createComponent("compa", 2, "sleep 1000")); |
| Component compb = createComponent("compb", 2, "sleep 1000"); |
| |
| // Let compb depedends on compa; |
| compb.setDependencies(Collections.singletonList("compa")); |
| exampleApp.addComponent(compb); |
| |
| client.actionCreate(exampleApp); |
| waitForAllCompToBeReady(client, exampleApp); |
| |
| // check that containers for compa are launched before containers for compb |
| checkContainerLaunchDependencies(client, exampleApp, "compa", "compb"); |
| |
| client.actionStop(exampleApp.getName(), true); |
| client.actionDestroy(exampleApp.getName()); |
| } |
| |
| // Test to verify recovery of SeviceMaster after RM is restarted. |
| // 1. Create an example service. |
| // 2. Restart RM. |
| // 3. Fail the application attempt. |
| // 4. Verify ServiceMaster recovers. |
| @Test(timeout = 200000) |
| public void testRecoverComponentsAfterRMRestart() throws Exception { |
| YarnConfiguration conf = new YarnConfiguration(); |
| conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); |
| conf.setBoolean( |
| YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); |
| conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, |
| 500L); |
| |
| conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); |
| conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); |
| setConf(conf); |
| setupInternal(NUM_NMS); |
| |
| ServiceClient client = createClient(); |
| Service exampleApp = createExampleApplication(); |
| client.actionCreate(exampleApp); |
| waitForAllCompToBeReady(client, exampleApp); |
| |
| LOG.info("Restart the resource manager"); |
| getYarnCluster().restartResourceManager( |
| getYarnCluster().getActiveRMIndex()); |
| GenericTestUtils.waitFor(() -> |
| getYarnCluster().getResourceManager().getServiceState() == |
| org.apache.hadoop.service.Service.STATE.STARTED, 2000, 200000); |
| Assert.assertTrue("node managers connected", |
| getYarnCluster().waitForNodeManagersToConnect(5000)); |
| |
| ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId()); |
| ApplicationAttemptId applicationAttemptId = client.getYarnClient() |
| .getApplicationReport(exampleAppId).getCurrentApplicationAttemptId(); |
| |
| Multimap<String, String> containersBeforeFailure = getContainersForAllComp( |
| client, exampleApp); |
| |
| LOG.info("Fail the application attempt {}", applicationAttemptId); |
| client.getYarnClient().failApplicationAttempt(applicationAttemptId); |
| //wait until attempt 2 is running |
| GenericTestUtils.waitFor(() -> { |
| try { |
| ApplicationReport ar = client.getYarnClient() |
| .getApplicationReport(exampleAppId); |
| return ar.getCurrentApplicationAttemptId().getAttemptId() == 2 && |
| ar.getYarnApplicationState() == YarnApplicationState.RUNNING; |
| } catch (YarnException | IOException e) { |
| throw new RuntimeException("while waiting", e); |
| } |
| }, 2000, 200000); |
| |
| Multimap<String, String> containersAfterFailure = getContainersForAllComp( |
| client, exampleApp); |
| Assert.assertEquals("component container affected by restart", |
| containersBeforeFailure, containersAfterFailure); |
| |
| LOG.info("Stop/destroy service {}", exampleApp); |
| client.actionStop(exampleApp.getName(), true); |
| client.actionDestroy(exampleApp.getName()); |
| } |
| |
| // Check containers launched are in dependency order |
| // Get all containers into a list and sort based on container launch time e.g. |
| // compa-c1, compa-c2, compb-c1, compb-c2; |
| // check that the container's launch time are align with the dependencies. |
| private void checkContainerLaunchDependencies(ServiceClient client, |
| Service exampleApp, String... compOrder) |
| throws IOException, YarnException { |
| Service retrievedApp = client.getStatus(exampleApp.getName()); |
| List<Container> containerList = new ArrayList<>(); |
| for (Component component : retrievedApp.getComponents()) { |
| containerList.addAll(component.getContainers()); |
| } |
| // sort based on launchTime |
| containerList |
| .sort((o1, o2) -> o1.getLaunchTime().compareTo(o2.getLaunchTime())); |
| LOG.info("containerList: " + containerList); |
| // check the containers are in the dependency order. |
| int index = 0; |
| for (String comp : compOrder) { |
| long num = retrievedApp.getComponent(comp).getNumberOfContainers(); |
| for (int i = 0; i < num; i++) { |
| String compInstanceName = containerList.get(index).getComponentInstanceName(); |
| String compName = |
| compInstanceName.substring(0, compInstanceName.lastIndexOf('-')); |
| Assert.assertEquals(comp, compName); |
| index++; |
| } |
| } |
| } |
| |
| |
| private Map<String, Long> flexComponents(ServiceClient client, |
| Service exampleApp, long count) throws YarnException, IOException { |
| Map<String, Long> compCounts = new HashMap<>(); |
| compCounts.put("compa", count); |
| compCounts.put("compb", count); |
| // flex will update the persisted conf to reflect latest number of containers. |
| exampleApp.getComponent("compa").setNumberOfContainers(count); |
| exampleApp.getComponent("compb").setNumberOfContainers(count); |
| client.flexByRestService(exampleApp.getName(), compCounts); |
| return compCounts; |
| } |
| |
| // Check each component's comp instances name are in sequential order. |
| // E.g. If there are two instances compA-1 and compA-2 |
| // When flex up to 4 instances, it should be compA-1 , compA-2, compA-3, compA-4 |
| // When flex down to 3 instances, it should be compA-1 , compA-2, compA-3. |
| private void checkCompInstancesInOrder(ServiceClient client, |
| Service exampleApp) throws IOException, YarnException { |
| Service service = client.getStatus(exampleApp.getName()); |
| for (Component comp : service.getComponents()) { |
| checkEachCompInstancesInOrder(comp); |
| } |
| } |
| |
| private void checkRegistryAndCompDirDeleted() { |
| |
| } |
| |
| private void checkEachCompInstancesInOrder(Component component) { |
| long expectedNumInstances = component.getNumberOfContainers(); |
| Assert.assertEquals(expectedNumInstances, component.getContainers().size()); |
| TreeSet<String> instances = new TreeSet<>(); |
| for (Container container : component.getContainers()) { |
| instances.add(container.getComponentInstanceName()); |
| } |
| |
| int i = 0; |
| for (String s : instances) { |
| Assert.assertEquals(component.getName() + "-" + i, s); |
| i++; |
| } |
| } |
| |
| private void waitForOneCompToBeReady(ServiceClient client, |
| Service exampleApp, String readyComp) |
| throws TimeoutException, InterruptedException { |
| long numExpectedContainers = |
| exampleApp.getComponent(readyComp).getNumberOfContainers(); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| Service retrievedApp = client.getStatus(exampleApp.getName()); |
| Component retrievedComp = retrievedApp.getComponent(readyComp); |
| |
| if (retrievedComp.getContainers() != null |
| && retrievedComp.getContainers().size() == numExpectedContainers) { |
| LOG.info(readyComp + " found " + numExpectedContainers |
| + " containers running"); |
| return true; |
| } else { |
| LOG.info(" Waiting for " + readyComp + "'s containers to be running"); |
| return false; |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| return false; |
| } |
| }, 2000, 200000); |
| } |
| |
| // wait until all the containers for all components become ready state |
| private void waitForAllCompToBeReady(ServiceClient client, |
| Service exampleApp) throws TimeoutException, InterruptedException { |
| int expectedTotalContainers = countTotalContainers(exampleApp); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| Service retrievedApp = client.getStatus(exampleApp.getName()); |
| int totalReadyContainers = 0; |
| LOG.info("Num Components " + retrievedApp.getComponents().size()); |
| for (Component component : retrievedApp.getComponents()) { |
| LOG.info("looking for " + component.getName()); |
| LOG.info(component.toString()); |
| if (component.getContainers() != null) { |
| if (component.getContainers().size() == exampleApp |
| .getComponent(component.getName()).getNumberOfContainers()) { |
| for (Container container : component.getContainers()) { |
| LOG.info( |
| "Container state " + container.getState() + ", component " |
| + component.getName()); |
| if (container.getState() == ContainerState.READY) { |
| totalReadyContainers++; |
| LOG.info("Found 1 ready container " + container.getId()); |
| } |
| } |
| } else { |
| LOG.info(component.getName() + " Expected number of containers " |
| + exampleApp.getComponent(component.getName()) |
| .getNumberOfContainers() + ", current = " + component |
| .getContainers()); |
| } |
| } |
| } |
| LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers |
| + " expected = " + expectedTotalContainers); |
| return totalReadyContainers == expectedTotalContainers; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| return false; |
| } |
| }, 2000, 200000); |
| } |
| |
| /** |
| * Get all containers of a service. |
| */ |
| private Multimap<String, String> getContainersForAllComp(ServiceClient client, |
| Service example) throws IOException, YarnException { |
| |
| Multimap<String, String> allContainers = HashMultimap.create(); |
| Service retrievedApp = client.getStatus(example.getName()); |
| retrievedApp.getComponents().forEach(component -> { |
| if (component.getContainers() != null) { |
| component.getContainers().forEach(container -> { |
| allContainers.put(component.getName(), container.getId()); |
| }); |
| } |
| }); |
| return allContainers; |
| } |
| |
| private ServiceClient createClient() throws Exception { |
| ServiceClient client = new ServiceClient() { |
| @Override protected Path addJarResource(String appName, |
| Map<String, LocalResource> localResources) |
| throws IOException, SliderException { |
| // do nothing, the Unit test will use local jars |
| return null; |
| } |
| }; |
| client.init(getConf()); |
| client.start(); |
| return client; |
| } |
| |
| |
| private int countTotalContainers(Service service) { |
| int totalContainers = 0; |
| for (Component component : service.getComponents()) { |
| totalContainers += component.getNumberOfContainers(); |
| } |
| return totalContainers; |
| } |
| } |