| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.applications.distributedshell; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintWriter; |
| import java.net.InetAddress; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.ServerSocketUtil; |
| import org.apache.hadoop.util.JarFinder; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; |
| import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.MiniYARNCluster; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestDistributedShell { |
| |
| private static final Log LOG = |
| LogFactory.getLog(TestDistributedShell.class); |
| |
| protected MiniYARNCluster yarnCluster = null; |
| protected YarnConfiguration conf = null; |
| private static final int NUM_NMS = 1; |
| |
| protected final static String APPMASTER_JAR = |
| JarFinder.getJar(ApplicationMaster.class); |
| |
| @Before |
| public void setup() throws Exception { |
| setupInternal(NUM_NMS); |
| } |
| |
| protected void setupInternal(int numNodeManager) throws Exception { |
| |
| LOG.info("Starting up YARN cluster"); |
| |
| conf = new YarnConfiguration(); |
| conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); |
| conf.set("yarn.log.dir", "target"); |
| conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); |
| conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); |
| conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); |
| conf.set("mapreduce.jobhistory.address", |
| "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10)); |
| |
| if (yarnCluster == null) { |
| yarnCluster = |
| new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1, |
| numNodeManager, 1, 1); |
| yarnCluster.init(conf); |
| |
| yarnCluster.start(); |
| |
| waitForNMsToRegister(); |
| |
| URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); |
| if (url == null) { |
| throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath"); |
| } |
| Configuration yarnClusterConfig = yarnCluster.getConfig(); |
| yarnClusterConfig.set("yarn.application.classpath", new File(url.getPath()).getParent()); |
| //write the document to a buffer (not directly to the file, as that |
| //can cause the file being written to get read -which will then fail. |
| ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); |
| yarnClusterConfig.writeXml(bytesOut); |
| bytesOut.close(); |
| //write the bytes to the file in the classpath |
| OutputStream os = new FileOutputStream(new File(url.getPath())); |
| os.write(bytesOut.toByteArray()); |
| os.close(); |
| } |
| FileContext fsContext = FileContext.getLocalFSFileContext(); |
| fsContext |
| .delete( |
| new Path(conf |
| .get("yarn.timeline-service.leveldb-timeline-store.path")), |
| true); |
| try { |
| Thread.sleep(2000); |
| } catch (InterruptedException e) { |
| LOG.info("setup thread sleep interrupted. message=" + e.getMessage()); |
| } |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| if (yarnCluster != null) { |
| try { |
| yarnCluster.stop(); |
| } finally { |
| yarnCluster = null; |
| } |
| } |
| FileContext fsContext = FileContext.getLocalFSFileContext(); |
| fsContext |
| .delete( |
| new Path(conf |
| .get("yarn.timeline-service.leveldb-timeline-store.path")), |
| true); |
| } |
| |
| @Test(timeout=90000) |
| public void testDSShellWithDomain() throws Exception { |
| testDSShell(true); |
| } |
| |
| @Test(timeout=90000) |
| public void testDSShellWithoutDomain() throws Exception { |
| testDSShell(false); |
| } |
| |
| public void testDSShell(boolean haveDomain) throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| if (haveDomain) { |
| String[] domainArgs = { |
| "--domain", |
| "TEST_DOMAIN", |
| "--view_acls", |
| "reader_user reader_group", |
| "--modify_acls", |
| "writer_user writer_group", |
| "--create" |
| }; |
| List<String> argsList = new ArrayList<String>(Arrays.asList(args)); |
| argsList.addAll(Arrays.asList(domainArgs)); |
| args = argsList.toArray(new String[argsList.size()]); |
| } |
| |
| LOG.info("Initializing DS Client"); |
| final Client client = new Client(new Configuration(yarnCluster.getConfig())); |
| boolean initSuccess = client.init(args); |
| Assert.assertTrue(initSuccess); |
| LOG.info("Running DS Client"); |
| final AtomicBoolean result = new AtomicBoolean(false); |
| Thread t = new Thread() { |
| public void run() { |
| try { |
| result.set(client.run()); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| t.start(); |
| |
| YarnClient yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(new Configuration(yarnCluster.getConfig())); |
| yarnClient.start(); |
| String hostName = NetUtils.getHostname(); |
| |
| boolean verified = false; |
| String errorMessage = ""; |
| while(!verified) { |
| List<ApplicationReport> apps = yarnClient.getApplications(); |
| if (apps.size() == 0 ) { |
| Thread.sleep(10); |
| continue; |
| } |
| ApplicationReport appReport = apps.get(0); |
| if(appReport.getHost().equals("N/A")) { |
| Thread.sleep(10); |
| continue; |
| } |
| errorMessage = |
| "Expected host name to start with '" + hostName + "', was '" |
| + appReport.getHost() + "'. Expected rpc port to be '-1', was '" |
| + appReport.getRpcPort() + "'."; |
| if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) { |
| verified = true; |
| } |
| if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) { |
| break; |
| } |
| } |
| Assert.assertTrue(errorMessage, verified); |
| t.join(); |
| LOG.info("Client run completed. Result=" + result); |
| Assert.assertTrue(result.get()); |
| |
| TimelineDomain domain = null; |
| if (haveDomain) { |
| domain = yarnCluster.getApplicationHistoryServer() |
| .getTimelineStore().getDomain("TEST_DOMAIN"); |
| Assert.assertNotNull(domain); |
| Assert.assertEquals("reader_user reader_group", domain.getReaders()); |
| Assert.assertEquals("writer_user writer_group", domain.getWriters()); |
| } |
| TimelineEntities entitiesAttempts = yarnCluster |
| .getApplicationHistoryServer() |
| .getTimelineStore() |
| .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(), |
| null, null, null, null, null, null, null, null, null); |
| Assert.assertNotNull(entitiesAttempts); |
| Assert.assertEquals(1, entitiesAttempts.getEntities().size()); |
| Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents() |
| .size()); |
| Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType() |
| .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString()); |
| if (haveDomain) { |
| Assert.assertEquals(domain.getId(), |
| entitiesAttempts.getEntities().get(0).getDomainId()); |
| } else { |
| Assert.assertEquals("DEFAULT", |
| entitiesAttempts.getEntities().get(0).getDomainId()); |
| } |
| TimelineEntities entities = yarnCluster |
| .getApplicationHistoryServer() |
| .getTimelineStore() |
| .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null, |
| null, null, null, null, null, null, null, null); |
| Assert.assertNotNull(entities); |
| Assert.assertEquals(2, entities.getEntities().size()); |
| Assert.assertEquals(entities.getEntities().get(0).getEntityType() |
| .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString()); |
| if (haveDomain) { |
| Assert.assertEquals(domain.getId(), |
| entities.getEntities().get(0).getDomainId()); |
| } else { |
| Assert.assertEquals("DEFAULT", |
| entities.getEntities().get(0).getDomainId()); |
| } |
| } |
| |
| /* |
| * NetUtils.getHostname() returns a string in the form "hostname/ip". |
| * Sometimes the hostname we get is the FQDN and sometimes the short name. In |
| * addition, on machines with multiple network interfaces, it runs any one of |
| * the ips. The function below compares the returns values for |
| * NetUtils.getHostname() accounting for the conditions mentioned. |
| */ |
| private boolean checkHostname(String appHostname) throws Exception { |
| |
| String hostname = NetUtils.getHostname(); |
| if (hostname.equals(appHostname)) { |
| return true; |
| } |
| |
| Assert.assertTrue("Unknown format for hostname " + appHostname, |
| appHostname.contains("/")); |
| Assert.assertTrue("Unknown format for hostname " + hostname, |
| hostname.contains("/")); |
| |
| String[] appHostnameParts = appHostname.split("/"); |
| String[] hostnameParts = hostname.split("/"); |
| |
| return (compareFQDNs(appHostnameParts[0], hostnameParts[0]) && checkIPs( |
| hostnameParts[0], hostnameParts[1], appHostnameParts[1])); |
| } |
| |
| private boolean compareFQDNs(String appHostname, String hostname) |
| throws Exception { |
| if (appHostname.equals(hostname)) { |
| return true; |
| } |
| String appFQDN = InetAddress.getByName(appHostname).getCanonicalHostName(); |
| String localFQDN = InetAddress.getByName(hostname).getCanonicalHostName(); |
| return appFQDN.equals(localFQDN); |
| } |
| |
| private boolean checkIPs(String hostname, String localIP, String appIP) |
| throws Exception { |
| |
| if (localIP.equals(appIP)) { |
| return true; |
| } |
| boolean appIPCheck = false; |
| boolean localIPCheck = false; |
| InetAddress[] addresses = InetAddress.getAllByName(hostname); |
| for (InetAddress ia : addresses) { |
| if (ia.getHostAddress().equals(appIP)) { |
| appIPCheck = true; |
| continue; |
| } |
| if (ia.getHostAddress().equals(localIP)) { |
| localIPCheck = true; |
| } |
| } |
| return (appIPCheck && localIPCheck); |
| |
| } |
| |
| @Test(timeout=90000) |
| public void testDSRestartWithPreviousRunningContainers() throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "1", |
| "--shell_command", |
| "sleep 8", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128", |
| "--keep_containers_across_application_attempts" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| Client client = new Client(TestDSFailedAppMaster.class.getName(), |
| new Configuration(yarnCluster.getConfig())); |
| |
| client.init(args); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| |
| LOG.info("Client run completed. Result=" + result); |
| // application should succeed |
| Assert.assertTrue(result); |
| } |
| |
| /* |
| * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. |
| * Set attempt_failures_validity_interval as 2.5 seconds. It will check |
| * how many attempt failures for previous 2.5 seconds. |
| * The application is expected to be successful. |
| */ |
| @Test(timeout=90000) |
| public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "1", |
| "--shell_command", |
| "sleep 8", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128", |
| "--attempt_failures_validity_interval", |
| "2500" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| Configuration conf = yarnCluster.getConfig(); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); |
| Client client = new Client(TestDSSleepingAppMaster.class.getName(), |
| new Configuration(conf)); |
| |
| client.init(args); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| |
| LOG.info("Client run completed. Result=" + result); |
| // application should succeed |
| Assert.assertTrue(result); |
| } |
| |
| /* |
| * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. |
| * Set attempt_failures_validity_interval as 15 seconds. It will check |
| * how many attempt failure for previous 15 seconds. |
| * The application is expected to be fail. |
| */ |
| @Test(timeout=90000) |
| public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "1", |
| "--shell_command", |
| "sleep 8", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128", |
| "--attempt_failures_validity_interval", |
| "15000" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| Configuration conf = yarnCluster.getConfig(); |
| conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); |
| Client client = new Client(TestDSSleepingAppMaster.class.getName(), |
| new Configuration(conf)); |
| |
| client.init(args); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| |
| LOG.info("Client run completed. Result=" + result); |
| // application should be failed |
| Assert.assertFalse(result); |
| } |
| |
| @Test(timeout=90000) |
| public void testDSShellWithCustomLogPropertyFile() throws Exception { |
| final File basedir = |
| new File("target", TestDistributedShell.class.getName()); |
| final File tmpDir = new File(basedir, "tmpDir"); |
| tmpDir.mkdirs(); |
| final File customLogProperty = new File(tmpDir, "custom_log4j.properties"); |
| if (customLogProperty.exists()) { |
| customLogProperty.delete(); |
| } |
| if(!customLogProperty.createNewFile()) { |
| Assert.fail("Can not create custom log4j property file."); |
| } |
| PrintWriter fileWriter = new PrintWriter(customLogProperty); |
| // set the output to DEBUG level |
| fileWriter.write("log4j.rootLogger=debug,stdout"); |
| fileWriter.close(); |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "3", |
| "--shell_command", |
| "echo", |
| "--shell_args", |
| "HADOOP", |
| "--log_properties", |
| customLogProperty.getAbsolutePath(), |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| |
| //Before run the DS, the default the log level is INFO |
| final Log LOG_Client = |
| LogFactory.getLog(Client.class); |
| Assert.assertTrue(LOG_Client.isInfoEnabled()); |
| Assert.assertFalse(LOG_Client.isDebugEnabled()); |
| final Log LOG_AM = LogFactory.getLog(ApplicationMaster.class); |
| Assert.assertTrue(LOG_AM.isInfoEnabled()); |
| Assert.assertFalse(LOG_AM.isDebugEnabled()); |
| |
| LOG.info("Initializing DS Client"); |
| final Client client = |
| new Client(new Configuration(yarnCluster.getConfig())); |
| boolean initSuccess = client.init(args); |
| Assert.assertTrue(initSuccess); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| LOG.info("Client run completed. Result=" + result); |
| Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10); |
| //After DS is finished, the log level should be DEBUG |
| Assert.assertTrue(LOG_Client.isInfoEnabled()); |
| Assert.assertTrue(LOG_Client.isDebugEnabled()); |
| Assert.assertTrue(LOG_AM.isInfoEnabled()); |
| Assert.assertTrue(LOG_AM.isDebugEnabled()); |
| } |
| |
| public void testDSShellWithCommands() throws Exception { |
| |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--shell_command", |
| "\"echo output_ignored;echo output_expected\"", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| final Client client = |
| new Client(new Configuration(yarnCluster.getConfig())); |
| boolean initSuccess = client.init(args); |
| Assert.assertTrue(initSuccess); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| LOG.info("Client run completed. Result=" + result); |
| List<String> expectedContent = new ArrayList<String>(); |
| expectedContent.add("output_expected"); |
| verifyContainerLog(2, expectedContent, false, ""); |
| } |
| |
| @Test(timeout=90000) |
| public void testDSShellWithMultipleArgs() throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "4", |
| "--shell_command", |
| "echo", |
| "--shell_args", |
| "HADOOP YARN MAPREDUCE HDFS", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| final Client client = |
| new Client(new Configuration(yarnCluster.getConfig())); |
| boolean initSuccess = client.init(args); |
| Assert.assertTrue(initSuccess); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| LOG.info("Client run completed. Result=" + result); |
| List<String> expectedContent = new ArrayList<String>(); |
| expectedContent.add("HADOOP YARN MAPREDUCE HDFS"); |
| verifyContainerLog(4, expectedContent, false, ""); |
| } |
| |
| @Test(timeout=90000) |
| public void testDSShellWithShellScript() throws Exception { |
| final File basedir = |
| new File("target", TestDistributedShell.class.getName()); |
| final File tmpDir = new File(basedir, "tmpDir"); |
| tmpDir.mkdirs(); |
| final File customShellScript = new File(tmpDir, "custom_script.sh"); |
| if (customShellScript.exists()) { |
| customShellScript.delete(); |
| } |
| if (!customShellScript.createNewFile()) { |
| Assert.fail("Can not create custom shell script file."); |
| } |
| PrintWriter fileWriter = new PrintWriter(customShellScript); |
| // set the output to DEBUG level |
| fileWriter.write("echo testDSShellWithShellScript"); |
| fileWriter.close(); |
| System.out.println(customShellScript.getAbsolutePath()); |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "1", |
| "--shell_script", |
| customShellScript.getAbsolutePath(), |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| final Client client = |
| new Client(new Configuration(yarnCluster.getConfig())); |
| boolean initSuccess = client.init(args); |
| Assert.assertTrue(initSuccess); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| LOG.info("Client run completed. Result=" + result); |
| List<String> expectedContent = new ArrayList<String>(); |
| expectedContent.add("testDSShellWithShellScript"); |
| verifyContainerLog(1, expectedContent, false, ""); |
| } |
| |
| @Test(timeout=90000) |
| public void testDSShellWithInvalidArgs() throws Exception { |
| Client client = new Client(new Configuration(yarnCluster.getConfig())); |
| |
| LOG.info("Initializing DS Client with no args"); |
| try { |
| client.init(new String[]{}); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("No args")); |
| } |
| |
| LOG.info("Initializing DS Client with no jar file"); |
| try { |
| String[] args = { |
| "--num_containers", |
| "2", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128" |
| }; |
| client.init(args); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("No jar")); |
| } |
| |
| LOG.info("Initializing DS Client with no shell command"); |
| try { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128" |
| }; |
| client.init(args); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("No shell command")); |
| } |
| |
| LOG.info("Initializing DS Client with invalid no. of containers"); |
| try { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "-1", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128" |
| }; |
| client.init(args); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("Invalid no. of containers")); |
| } |
| |
| LOG.info("Initializing DS Client with invalid no. of vcores"); |
| try { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "-2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| client.init(args); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("Invalid virtual cores specified")); |
| } |
| |
| LOG.info("Initializing DS Client with --shell_command and --shell_script"); |
| try { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1", |
| "--shell_script", |
| "test.sh" |
| }; |
| client.init(args); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("Can not specify shell_command option " + |
| "and shell_script option at the same time")); |
| } |
| |
| LOG.info("Initializing DS Client without --shell_command and --shell_script"); |
| try { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1" |
| }; |
| client.init(args); |
| Assert.fail("Exception is expected"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue("The throw exception is not expected", |
| e.getMessage().contains("No shell command or shell script specified " + |
| "to be executed by application master")); |
| } |
| } |
| |
| protected void waitForNMsToRegister() throws Exception { |
| int sec = 60; |
| while (sec >= 0) { |
| if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() |
| >= NUM_NMS) { |
| break; |
| } |
| Thread.sleep(1000); |
| sec--; |
| } |
| } |
| |
| @Test(timeout=90000) |
| public void testContainerLaunchFailureHandling() throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--container_memory", |
| "128" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| Client client = new Client(ContainerLaunchFailAppMaster.class.getName(), |
| new Configuration(yarnCluster.getConfig())); |
| boolean initSuccess = client.init(args); |
| Assert.assertTrue(initSuccess); |
| LOG.info("Running DS Client"); |
| boolean result = client.run(); |
| |
| LOG.info("Client run completed. Result=" + result); |
| Assert.assertFalse(result); |
| |
| } |
| |
| @Test(timeout=90000) |
| public void testDebugFlag() throws Exception { |
| String[] args = { |
| "--jar", |
| APPMASTER_JAR, |
| "--num_containers", |
| "2", |
| "--shell_command", |
| Shell.WINDOWS ? "dir" : "ls", |
| "--master_memory", |
| "512", |
| "--master_vcores", |
| "2", |
| "--container_memory", |
| "128", |
| "--container_vcores", |
| "1", |
| "--debug" |
| }; |
| |
| LOG.info("Initializing DS Client"); |
| Client client = new Client(new Configuration(yarnCluster.getConfig())); |
| Assert.assertTrue(client.init(args)); |
| LOG.info("Running DS Client"); |
| Assert.assertTrue(client.run()); |
| } |
| |
| private int verifyContainerLog(int containerNum, |
| List<String> expectedContent, boolean count, String expectedWord) { |
| File logFolder = |
| new File(yarnCluster.getNodeManager(0).getConfig() |
| .get(YarnConfiguration.NM_LOG_DIRS, |
| YarnConfiguration.DEFAULT_NM_LOG_DIRS)); |
| |
| File[] listOfFiles = logFolder.listFiles(); |
| int currentContainerLogFileIndex = -1; |
| for (int i = listOfFiles.length - 1; i >= 0; i--) { |
| if (listOfFiles[i].listFiles().length == containerNum + 1) { |
| currentContainerLogFileIndex = i; |
| break; |
| } |
| } |
| Assert.assertTrue(currentContainerLogFileIndex != -1); |
| File[] containerFiles = |
| listOfFiles[currentContainerLogFileIndex].listFiles(); |
| |
| int numOfWords = 0; |
| for (int i = 0; i < containerFiles.length; i++) { |
| for (File output : containerFiles[i].listFiles()) { |
| if (output.getName().trim().contains("stdout")) { |
| BufferedReader br = null; |
| List<String> stdOutContent = new ArrayList<String>(); |
| try { |
| |
| String sCurrentLine; |
| br = new BufferedReader(new FileReader(output)); |
| int numOfline = 0; |
| while ((sCurrentLine = br.readLine()) != null) { |
| if (count) { |
| if (sCurrentLine.contains(expectedWord)) { |
| numOfWords++; |
| } |
| } else if (output.getName().trim().equals("stdout")){ |
| if (! Shell.WINDOWS) { |
| Assert.assertEquals("The current is" + sCurrentLine, |
| expectedContent.get(numOfline), sCurrentLine.trim()); |
| numOfline++; |
| } else { |
| stdOutContent.add(sCurrentLine.trim()); |
| } |
| } |
| } |
| /* By executing bat script using cmd /c, |
| * it will output all contents from bat script first |
| * It is hard for us to do check line by line |
| * Simply check whether output from bat file contains |
| * all the expected messages |
| */ |
| if (Shell.WINDOWS && !count |
| && output.getName().trim().equals("stdout")) { |
| Assert.assertTrue(stdOutContent.containsAll(expectedContent)); |
| } |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } finally { |
| try { |
| if (br != null) |
| br.close(); |
| } catch (IOException ex) { |
| ex.printStackTrace(); |
| } |
| } |
| } |
| } |
| } |
| return numOfWords; |
| } |
| } |
| |