| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.StringReader; |
| import java.net.URI; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.jar.JarOutputStream; |
| import java.util.zip.ZipEntry; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.FailingMapper; |
| import org.apache.hadoop.RandomTextWriterJob; |
| import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat; |
| import org.apache.hadoop.fs.viewfs.ConfigUtil; |
| import org.apache.hadoop.mapreduce.SleepJob; |
| import org.apache.hadoop.mapreduce.SleepJob.SleepMapper; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.TaskLog; |
| import org.apache.hadoop.mapreduce.Counters; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobCounter; |
| import org.apache.hadoop.mapreduce.JobPriority; |
| import org.apache.hadoop.mapreduce.JobStatus; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.TaskCompletionEvent; |
| import org.apache.hadoop.mapreduce.TaskID; |
| import org.apache.hadoop.mapreduce.TaskReport; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.TypeConverter; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; |
| import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| import org.apache.hadoop.mapreduce.v2.app.MRAppMaster; |
| import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; |
| import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.util.ApplicationClassLoader; |
| import org.apache.hadoop.util.ClassUtil; |
| import org.apache.hadoop.util.JarFinder; |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; |
| import org.apache.log4j.Level; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| |
| public class TestMRJobs { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestMRJobs.class); |
| private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES = |
| EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); |
| private static final int NUM_NODE_MGRS = 3; |
| private static final String TEST_IO_SORT_MB = "11"; |
| |
| private static final int DEFAULT_REDUCES = 2; |
| protected int numSleepReducers = DEFAULT_REDUCES; |
| |
| protected static MiniMRYarnCluster mrCluster; |
| protected static MiniDFSCluster dfsCluster; |
| |
| private static Configuration conf = new Configuration(); |
| private static FileSystem localFs; |
| private static FileSystem remoteFs; |
| static { |
| try { |
| localFs = FileSystem.getLocal(conf); |
| } catch (IOException io) { |
| throw new RuntimeException("problem getting local fs", io); |
| } |
| } |
| |
| private static Path TEST_ROOT_DIR = localFs.makeQualified( |
| new Path("target", TestMRJobs.class.getName() + "-tmpDir")); |
| static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); |
| private static final String OUTPUT_ROOT_DIR = "/tmp/" + |
| TestMRJobs.class.getSimpleName(); |
| private static final Path TEST_RESOURCES_DIR = new Path(TEST_ROOT_DIR, |
| "localizedResources"); |
| |
| @BeforeClass |
| public static void setup() throws IOException { |
| try { |
| dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) |
| .format(true).racks(null).build(); |
| remoteFs = dfsCluster.getFileSystem(); |
| } catch (IOException io) { |
| throw new RuntimeException("problem starting mini dfs cluster", io); |
| } |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| if (mrCluster == null) { |
| mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), |
| NUM_NODE_MGRS); |
| Configuration conf = new Configuration(); |
| conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS |
| conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); |
| conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); |
| mrCluster.init(conf); |
| mrCluster.start(); |
| } |
| |
| // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to |
| // workaround the absent public discache. |
| localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); |
| localFs.setPermission(APP_JAR, new FsPermission("700")); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws IOException { |
| if (mrCluster != null) { |
| mrCluster.stop(); |
| mrCluster = null; |
| } |
| if (dfsCluster != null) { |
| dfsCluster.shutdown(); |
| dfsCluster = null; |
| } |
| if (localFs.exists(TEST_RESOURCES_DIR)) { |
| // clean up resource directory |
| localFs.delete(TEST_RESOURCES_DIR, true); |
| } |
| } |
| |
| @After |
| public void resetInit() { |
| numSleepReducers = DEFAULT_REDUCES; |
| } |
| |
| private static void setupJobResourceDirs() throws IOException { |
| if (localFs.exists(TEST_RESOURCES_DIR)) { |
| // clean up directory |
| localFs.delete(TEST_RESOURCES_DIR, true); |
| } |
| |
| localFs.mkdirs(TEST_RESOURCES_DIR); |
| FSDataOutputStream outF1 = null; |
| try { |
| // 10KB file |
| outF1 = localFs.create(new Path(TEST_RESOURCES_DIR, "file1.txt")); |
| outF1.write(new byte[10 * 1024]); |
| } finally { |
| if (outF1 != null) { |
| outF1.close(); |
| } |
| } |
| localFs.createNewFile(new Path(TEST_RESOURCES_DIR, "file2.txt")); |
| Path subDir = new Path(TEST_RESOURCES_DIR, "subDir"); |
| localFs.mkdirs(subDir); |
| FSDataOutputStream outF3 = null; |
| try { |
| // 1MB (plus 10 Bytes) file |
| outF3 = localFs.create(new Path(subDir, "file3.txt")); |
| outF3.write(new byte[(1 * 1024 * 1024) + 10]); |
| } finally { |
| if (outF3 != null) { |
| outF3.close(); |
| } |
| } |
| localFs.createNewFile(new Path(subDir, "file4.txt")); |
| } |
| |
| @Test (timeout = 300000) |
| public void testSleepJob() throws Exception { |
| testSleepJobInternal(false); |
| } |
| |
| @Test (timeout = 300000) |
| public void testSleepJobWithRemoteJar() throws Exception { |
| testSleepJobInternal(true); |
| } |
| |
| @Test(timeout = 300000) |
| public void testSleepJobWithLocalResourceUnderLimit() throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set limits to well above what is expected |
| sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6); |
| sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6); |
| setupJobResourceDirs(); |
| sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); |
| testSleepJobInternal(sleepConf, false, true, null); |
| } |
| |
| @Test(timeout = 300000) |
| public void testSleepJobWithLocalResourceSizeOverLimit() throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set limits to well below what is expected |
| sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 1); |
| setupJobResourceDirs(); |
| sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); |
| testSleepJobInternal(sleepConf, false, false, |
| ResourceViolation.TOTAL_RESOURCE_SIZE); |
| } |
| |
| @Test(timeout = 300000) |
| public void testSleepJobWithLocalResourceNumberOverLimit() throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set limits to well below what is expected |
| sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 1); |
| setupJobResourceDirs(); |
| sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); |
| testSleepJobInternal(sleepConf, false, false, |
| ResourceViolation.NUMBER_OF_RESOURCES); |
| } |
| |
| @Test(timeout = 300000) |
| public void testSleepJobWithLocalResourceCheckAndRemoteJar() |
| throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set limits to well above what is expected |
| sleepConf.setInt(MRJobConfig.MAX_RESOURCES, 6); |
| sleepConf.setLong(MRJobConfig.MAX_RESOURCES_MB, 6); |
| setupJobResourceDirs(); |
| sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); |
| testSleepJobInternal(sleepConf, true, true, null); |
| } |
| |
| @Test(timeout = 300000) |
| public void testSleepJobWithLocalIndividualResourceOverLimit() |
| throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set limits to well below what is expected |
| sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 1); |
| setupJobResourceDirs(); |
| sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); |
| testSleepJobInternal(sleepConf, false, false, |
| ResourceViolation.SINGLE_RESOURCE_SIZE); |
| } |
| |
| @Test(timeout = 300000) |
| public void testSleepJobWithLocalIndividualResourceUnderLimit() |
| throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set limits to well below what is expected |
| sleepConf.setInt(MRJobConfig.MAX_SINGLE_RESOURCE_MB, 2); |
| setupJobResourceDirs(); |
| sleepConf.set("tmpfiles", TEST_RESOURCES_DIR.toString()); |
| testSleepJobInternal(sleepConf, false, true, null); |
| } |
| |
| private void testSleepJobInternal(boolean useRemoteJar) throws Exception { |
| testSleepJobInternal(new Configuration(mrCluster.getConfig()), |
| useRemoteJar, true, null); |
| } |
| |
| private enum ResourceViolation { |
| NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE; |
| } |
| |
| private void testSleepJobInternal(Configuration sleepConf, |
| boolean useRemoteJar, boolean jobSubmissionShouldSucceed, |
| ResourceViolation violation) throws Exception { |
| LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar); |
| |
| if (!jobSubmissionShouldSucceed && violation == null) { |
| Assert.fail("Test is misconfigured. jobSubmissionShouldSucceed is set" |
| + " to false and a ResourceViolation is not specified."); |
| } |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| // set master address to local to test that local mode applied iff framework == local |
| sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); |
| |
| SleepJob sleepJob = new SleepJob(); |
| sleepJob.setConf(sleepConf); |
| |
| // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each: |
| Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1); |
| |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| if (useRemoteJar) { |
| final Path localJar = new Path( |
| ClassUtil.findContainingJar(SleepJob.class)); |
| ConfigUtil.addLink(job.getConfiguration(), "/jobjars", |
| localFs.makeQualified(localJar.getParent()).toUri()); |
| job.setJar("viewfs:///jobjars/" + localJar.getName()); |
| } else { |
| job.setJarByClass(SleepJob.class); |
| } |
| job.setMaxMapAttempts(1); // speed up failures |
| try { |
| job.submit(); |
| Assert.assertTrue("JobSubmission succeeded when it should have failed.", |
| jobSubmissionShouldSucceed); |
| } catch (IOException e) { |
| if (jobSubmissionShouldSucceed) { |
| Assert |
| .fail("Job submission failed when it should have succeeded: " + e); |
| } |
| switch (violation) { |
| case NUMBER_OF_RESOURCES: |
| if (!e.getMessage().contains( |
| "This job has exceeded the maximum number of" |
| + " submitted resources")) { |
| Assert.fail("Test failed unexpectedly: " + e); |
| } |
| break; |
| |
| case TOTAL_RESOURCE_SIZE: |
| if (!e.getMessage().contains( |
| "This job has exceeded the maximum size of submitted resources")) { |
| Assert.fail("Test failed unexpectedly: " + e); |
| } |
| break; |
| |
| case SINGLE_RESOURCE_SIZE: |
| if (!e.getMessage().contains( |
| "This job has exceeded the maximum size of a single submitted")) { |
| Assert.fail("Test failed unexpectedly: " + e); |
| } |
| break; |
| |
| default: |
| Assert.fail("Test failed unexpectedly: " + e); |
| break; |
| } |
| // we are done with the test (job submission failed) |
| return; |
| } |
| String trackingUrl = job.getTrackingURL(); |
| String jobId = job.getJobID().toString(); |
| boolean succeeded = job.waitForCompletion(true); |
| Assert.assertTrue(succeeded); |
| Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); |
| Assert.assertTrue("Tracking URL was " + trackingUrl + |
| " but didn't Match Job ID " + jobId , |
| trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); |
| verifySleepJobCounters(job); |
| verifyTaskProgress(job); |
| |
| // TODO later: add explicit "isUber()" checks of some sort (extend |
| // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value |
| } |
| |
| @Test(timeout = 3000000) |
| public void testJobWithChangePriority() throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // Assumption can be removed when FS priority support is implemented |
| Assume.assumeFalse(sleepConf.get(YarnConfiguration.RM_SCHEDULER) |
| .equals(FairScheduler.class.getCanonicalName())); |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| // set master address to local to test that local mode applied if framework |
| // equals local |
| sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); |
| sleepConf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 5); |
| |
| SleepJob sleepJob = new SleepJob(); |
| sleepJob.setConf(sleepConf); |
| Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1); |
| |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.setJarByClass(SleepJob.class); |
| job.setMaxMapAttempts(1); // speed up failures |
| job.submit(); |
| |
| // Set the priority to HIGH |
| job.setPriority(JobPriority.HIGH); |
| waitForPriorityToUpdate(job, JobPriority.HIGH); |
| // Verify the priority from job itself |
| assertThat(job.getPriority()).isEqualTo(JobPriority.HIGH); |
| |
| // Change priority to NORMAL (3) with new api |
| job.setPriorityAsInteger(3); // Verify the priority from job itself |
| waitForPriorityToUpdate(job, JobPriority.NORMAL); |
| assertThat(job.getPriority()).isEqualTo(JobPriority.NORMAL); |
| |
| // Change priority to a high integer value with new api |
| job.setPriorityAsInteger(89); // Verify the priority from job itself |
| waitForPriorityToUpdate(job, JobPriority.UNDEFINED_PRIORITY); |
| assertThat(job.getPriority()).isEqualTo(JobPriority.UNDEFINED_PRIORITY); |
| |
| boolean succeeded = job.waitForCompletion(true); |
| Assert.assertTrue(succeeded); |
| Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); |
| } |
| |
| @Test(timeout = 300000) |
| public void testJobWithWorkflowPriority() throws Exception { |
| Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| CapacityScheduler scheduler = (CapacityScheduler) mrCluster |
| .getResourceManager().getResourceScheduler(); |
| CapacitySchedulerConfiguration csConf = scheduler.getConfiguration(); |
| csConf.set(CapacitySchedulerConfiguration.WORKFLOW_PRIORITY_MAPPINGS, |
| WorkflowPriorityMappingsManager.getWorkflowPriorityMappingStr( |
| Arrays.asList(new WorkflowPriorityMapping( |
| "wf1", "root.default", Priority.newInstance(1))))); |
| csConf.setBoolean(CapacitySchedulerConfiguration. |
| ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true); |
| scheduler.reinitialize(csConf, scheduler.getRMContext()); |
| |
| // set master address to local to test that local mode applied if framework |
| // equals local |
| sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); |
| sleepConf |
| .setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5); |
| sleepConf.set(MRJobConfig.JOB_TAGS, |
| YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + "wf1"); |
| |
| SleepJob sleepJob = new SleepJob(); |
| sleepJob.setConf(sleepConf); |
| Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1); |
| |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.setJarByClass(SleepJob.class); |
| job.setMaxMapAttempts(1); // speed up failures |
| // VERY_HIGH priority should get overwritten by workflow priority mapping |
| job.setPriority(JobPriority.VERY_HIGH); |
| job.submit(); |
| |
| waitForPriorityToUpdate(job, JobPriority.VERY_LOW); |
| // Verify the priority from job itself |
| Assert.assertEquals(JobPriority.VERY_LOW, job.getPriority()); |
| |
| boolean succeeded = job.waitForCompletion(true); |
| Assert.assertTrue(succeeded); |
| Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); |
| } |
| |
| private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus) |
| throws IOException, InterruptedException { |
| // Max wait time to get the priority update can be kept as 20sec (100 * |
| // 100ms) |
| int waitCnt = 200; |
| while (waitCnt-- > 0) { |
| if (job.getPriority().equals(expectedStatus)) { |
| // Stop waiting as priority is updated. |
| break; |
| } else { |
| Thread.sleep(100); |
| } |
| } |
| } |
| |
| @Test(timeout = 300000) |
| public void testJobClassloader() throws IOException, InterruptedException, |
| ClassNotFoundException { |
| testJobClassloader(false); |
| } |
| |
| @Test(timeout = 300000) |
| public void testJobClassloaderWithCustomClasses() throws IOException, |
| InterruptedException, ClassNotFoundException { |
| testJobClassloader(true); |
| } |
| |
| private void testJobClassloader(boolean useCustomClasses) throws IOException, |
| InterruptedException, ClassNotFoundException { |
| LOG.info("\n\n\nStarting testJobClassloader()" |
| + " useCustomClasses=" + useCustomClasses); |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| final Configuration sleepConf = new Configuration(mrCluster.getConfig()); |
| // set master address to local to test that local mode applied iff framework == local |
| sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); |
| sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); |
| if (useCustomClasses) { |
| // to test AM loading user classes such as output format class, we want |
| // to blacklist them from the system classes (they need to be prepended |
| // as the first match wins) |
| String systemClasses = ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT; |
| // exclude the custom classes from system classes |
| systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" + |
| CustomSpeculator.class.getName() + "," + |
| systemClasses; |
| sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES, |
| systemClasses); |
| } |
| sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB); |
| sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); |
| sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); |
| sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString()); |
| sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class"); |
| final SleepJob sleepJob = new SleepJob(); |
| sleepJob.setConf(sleepConf); |
| final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1); |
| job.setMapperClass(ConfVerificationMapper.class); |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.setJarByClass(SleepJob.class); |
| job.setMaxMapAttempts(1); // speed up failures |
| if (useCustomClasses) { |
| // set custom output format class and speculator class |
| job.setOutputFormatClass(CustomOutputFormat.class); |
| final Configuration jobConf = job.getConfiguration(); |
| jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class, |
| Speculator.class); |
| // speculation needs to be enabled for the speculator to be loaded |
| jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true); |
| } |
| job.submit(); |
| boolean succeeded = job.waitForCompletion(true); |
| Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(), |
| succeeded); |
| } |
| |
| public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> { |
| public CustomOutputFormat() { |
| verifyClassLoader(getClass()); |
| } |
| |
| /** |
| * Verifies that the class was loaded by the job classloader if it is in the |
| * context of the MRAppMaster, and if not throws an exception to fail the |
| * job. |
| */ |
| private void verifyClassLoader(Class<?> cls) { |
| // to detect that it is instantiated in the context of the MRAppMaster, we |
| // inspect the stack trace and determine a caller is MRAppMaster |
| for (StackTraceElement e: new Throwable().getStackTrace()) { |
| if (e.getClassName().equals(MRAppMaster.class.getName()) && |
| !(cls.getClassLoader() instanceof ApplicationClassLoader)) { |
| throw new ExceptionInInitializerError("incorrect classloader used"); |
| } |
| } |
| } |
| } |
| |
| public static class CustomSpeculator extends DefaultSpeculator { |
| public CustomSpeculator(Configuration conf, AppContext context) { |
| super(conf, context); |
| verifyClassLoader(getClass()); |
| } |
| |
| /** |
| * Verifies that the class was loaded by the job classloader if it is in the |
| * context of the MRAppMaster, and if not throws an exception to fail the |
| * job. |
| */ |
| private void verifyClassLoader(Class<?> cls) { |
| // to detect that it is instantiated in the context of the MRAppMaster, we |
| // inspect the stack trace and determine a caller is MRAppMaster |
| for (StackTraceElement e: new Throwable().getStackTrace()) { |
| if (e.getClassName().equals(MRAppMaster.class.getName()) && |
| !(cls.getClassLoader() instanceof ApplicationClassLoader)) { |
| throw new ExceptionInInitializerError("incorrect classloader used"); |
| } |
| } |
| } |
| } |
| |
| protected void verifySleepJobCounters(Job job) throws InterruptedException, |
| IOException { |
| Counters counters = job.getCounters(); |
| Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) |
| .getValue()); |
| Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) |
| .getValue()); |
| Assert.assertEquals(numSleepReducers, |
| counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue()); |
| } |
| |
| protected void verifyTaskProgress(Job job) throws InterruptedException, |
| IOException { |
| for (TaskReport taskReport : job.getTaskReports(TaskType.MAP)) { |
| Assert.assertTrue(0.9999f < taskReport.getProgress() |
| && 1.0001f > taskReport.getProgress()); |
| } |
| for (TaskReport taskReport : job.getTaskReports(TaskType.REDUCE)) { |
| Assert.assertTrue(0.9999f < taskReport.getProgress() |
| && 1.0001f > taskReport.getProgress()); |
| } |
| } |
| |
| @Test (timeout = 60000) |
| public void testRandomWriter() throws IOException, InterruptedException, |
| ClassNotFoundException { |
| |
| LOG.info("\n\n\nStarting testRandomWriter()."); |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| RandomTextWriterJob randomWriterJob = new RandomTextWriterJob(); |
| mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072"); |
| mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024"); |
| Job job = randomWriterJob.createJob(mrCluster.getConfig()); |
| Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output"); |
| FileOutputFormat.setOutputPath(job, outputDir); |
| job.setSpeculativeExecution(false); |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.setJarByClass(RandomTextWriterJob.class); |
| job.setMaxMapAttempts(1); // speed up failures |
| job.submit(); |
| String trackingUrl = job.getTrackingURL(); |
| String jobId = job.getJobID().toString(); |
| boolean succeeded = job.waitForCompletion(true); |
| Assert.assertTrue(succeeded); |
| Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); |
| Assert.assertTrue("Tracking URL was " + trackingUrl + |
| " but didn't Match Job ID " + jobId , |
| trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); |
| |
| // Make sure there are three files in the output-dir |
| |
| RemoteIterator<FileStatus> iterator = |
| FileContext.getFileContext(mrCluster.getConfig()).listStatus( |
| outputDir); |
| int count = 0; |
| while (iterator.hasNext()) { |
| FileStatus file = iterator.next(); |
| if (!file.getPath().getName() |
| .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) { |
| count++; |
| } |
| } |
| Assert.assertEquals("Number of part files is wrong!", 3, count); |
| verifyRandomWriterCounters(job); |
| |
| // TODO later: add explicit "isUber()" checks of some sort |
| } |
| |
| protected void verifyRandomWriterCounters(Job job) |
| throws InterruptedException, IOException { |
| Counters counters = job.getCounters(); |
| Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) |
| .getValue()); |
| Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) |
| .getValue()); |
| } |
| |
| @Test (timeout = 60000) |
| public void testFailingMapper() throws IOException, InterruptedException, |
| ClassNotFoundException { |
| |
| LOG.info("\n\n\nStarting testFailingMapper()."); |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| Job job = runFailingMapperJob(); |
| |
| TaskID taskID = new TaskID(job.getJobID(), TaskType.MAP, 0); |
| TaskAttemptID aId = new TaskAttemptID(taskID, 0); |
| System.out.println("Diagnostics for " + aId + " :"); |
| for (String diag : job.getTaskDiagnostics(aId)) { |
| System.out.println(diag); |
| } |
| aId = new TaskAttemptID(taskID, 1); |
| System.out.println("Diagnostics for " + aId + " :"); |
| for (String diag : job.getTaskDiagnostics(aId)) { |
| System.out.println(diag); |
| } |
| |
| TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2); |
| Assert.assertEquals(TaskCompletionEvent.Status.FAILED, |
| events[0].getStatus()); |
| Assert.assertEquals(TaskCompletionEvent.Status.TIPFAILED, |
| events[1].getStatus()); |
| Assert.assertEquals(JobStatus.State.FAILED, job.getJobState()); |
| verifyFailingMapperCounters(job); |
| |
| // TODO later: add explicit "isUber()" checks of some sort |
| } |
| |
| protected void verifyFailingMapperCounters(Job job) |
| throws InterruptedException, IOException { |
| Counters counters = job.getCounters(); |
| Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS) |
| .getValue()); |
| Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) |
| .getValue()); |
| Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS) |
| .getValue()); |
| Assert |
| .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null |
| && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0); |
| } |
| |
| protected Job runFailingMapperJob() |
| throws IOException, InterruptedException, ClassNotFoundException { |
| Configuration myConf = new Configuration(mrCluster.getConfig()); |
| myConf.setInt(MRJobConfig.NUM_MAPS, 1); |
| myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts |
| |
| Job job = Job.getInstance(myConf); |
| |
| job.setJarByClass(FailingMapper.class); |
| job.setJobName("failmapper"); |
| job.setOutputKeyClass(Text.class); |
| job.setOutputValueClass(Text.class); |
| job.setInputFormatClass(RandomInputFormat.class); |
| job.setOutputFormatClass(TextOutputFormat.class); |
| job.setMapperClass(FailingMapper.class); |
| job.setNumReduceTasks(0); |
| |
| FileOutputFormat.setOutputPath(job, new Path(OUTPUT_ROOT_DIR, |
| "failmapper-output")); |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.submit(); |
| String trackingUrl = job.getTrackingURL(); |
| String jobId = job.getJobID().toString(); |
| boolean succeeded = job.waitForCompletion(true); |
| Assert.assertFalse(succeeded); |
| Assert.assertTrue("Tracking URL was " + trackingUrl + |
| " but didn't Match Job ID " + jobId , |
| trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); |
| return job; |
| } |
| |
| //@Test (timeout = 60000) |
| public void testSleepJobWithSecurityOn() throws IOException, |
| InterruptedException, ClassNotFoundException { |
| |
| LOG.info("\n\n\nStarting testSleepJobWithSecurityOn()."); |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| return; |
| } |
| |
| mrCluster.getConfig().set( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, |
| "kerberos"); |
| mrCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab"); |
| mrCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab"); |
| mrCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL, |
| "rm/sightbusy-lx@LOCALHOST"); |
| mrCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL, |
| "nm/sightbusy-lx@LOCALHOST"); |
| UserGroupInformation.setConfiguration(mrCluster.getConfig()); |
| |
| // Keep it in here instead of after RM/NM as multiple user logins happen in |
| // the same JVM. |
| UserGroupInformation user = UserGroupInformation.getCurrentUser(); |
| |
| LOG.info("User name is " + user.getUserName()); |
| for (Token<? extends TokenIdentifier> str : user.getTokens()) { |
| LOG.info("Token is " + str.encodeToUrlString()); |
| } |
| user.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| SleepJob sleepJob = new SleepJob(); |
| sleepJob.setConf(mrCluster.getConfig()); |
| Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0); |
| // //Job with reduces |
| // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1); |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.submit(); |
| String trackingUrl = job.getTrackingURL(); |
| String jobId = job.getJobID().toString(); |
| job.waitForCompletion(true); |
| Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); |
| Assert.assertTrue("Tracking URL was " + trackingUrl + |
| " but didn't Match Job ID " + jobId , |
| trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); |
| return null; |
| } |
| }); |
| |
| // TODO later: add explicit "isUber()" checks of some sort |
| } |
| |
| @Test(timeout = 120000) |
| public void testContainerRollingLog() throws IOException, |
| InterruptedException, ClassNotFoundException { |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| final SleepJob sleepJob = new SleepJob(); |
| final JobConf sleepConf = new JobConf(mrCluster.getConfig()); |
| sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString()); |
| final long userLogKb = 4; |
| sleepConf.setLong(MRJobConfig.TASK_USERLOG_LIMIT, userLogKb); |
| sleepConf.setInt(MRJobConfig.TASK_LOG_BACKUPS, 3); |
| sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString()); |
| final long amLogKb = 7; |
| sleepConf.setLong(MRJobConfig.MR_AM_LOG_KB, amLogKb); |
| sleepConf.setInt(MRJobConfig.MR_AM_LOG_BACKUPS, 7); |
| sleepJob.setConf(sleepConf); |
| |
| final Job job = sleepJob.createJob(1, 0, 1L, 100, 0L, 0); |
| job.setJarByClass(SleepJob.class); |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.waitForCompletion(true); |
| final JobId jobId = TypeConverter.toYarn(job.getJobID()); |
| final ApplicationId appID = jobId.getAppId(); |
| int pollElapsed = 0; |
| while (true) { |
| Thread.sleep(1000); |
| pollElapsed += 1000; |
| if (TERMINAL_RM_APP_STATES.contains( |
| mrCluster.getResourceManager().getRMContext().getRMApps().get(appID) |
| .getState())) { |
| break; |
| } |
| if (pollElapsed >= 60000) { |
| LOG.warn("application did not reach terminal state within 60 seconds"); |
| break; |
| } |
| } |
| Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager() |
| .getRMContext().getRMApps().get(appID).getState()); |
| |
| // Job finished, verify logs |
| // |
| |
| final String appIdStr = appID.toString(); |
| final String appIdSuffix = appIdStr.substring("application_".length(), |
| appIdStr.length()); |
| final String containerGlob = "container_" + appIdSuffix + "_*_*"; |
| final String syslogGlob = appIdStr |
| + Path.SEPARATOR + containerGlob |
| + Path.SEPARATOR + TaskLog.LogName.SYSLOG; |
| int numAppMasters = 0; |
| int numMapTasks = 0; |
| |
| for (int i = 0; i < NUM_NODE_MGRS; i++) { |
| final Configuration nmConf = mrCluster.getNodeManager(i).getConfig(); |
| for (String logDir : |
| nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) { |
| final Path absSyslogGlob = |
| new Path(logDir + Path.SEPARATOR + syslogGlob); |
| LOG.info("Checking for glob: " + absSyslogGlob); |
| final FileStatus[] syslogs = localFs.globStatus(absSyslogGlob); |
| for (FileStatus slog : syslogs) { |
| boolean foundAppMaster = job.isUber(); |
| final Path containerPathComponent = slog.getPath().getParent(); |
| if (!foundAppMaster) { |
| final ContainerId cid = ContainerId.fromString( |
| containerPathComponent.getName()); |
| foundAppMaster = |
| ((cid.getContainerId() & ContainerId.CONTAINER_ID_BITMASK)== 1); |
| } |
| |
| final FileStatus[] sysSiblings = localFs.globStatus(new Path( |
| containerPathComponent, TaskLog.LogName.SYSLOG + "*")); |
| // sort to ensure for i > 0 sysSiblings[i] == "syslog.i" |
| Arrays.sort(sysSiblings); |
| |
| if (foundAppMaster) { |
| numAppMasters++; |
| } else { |
| numMapTasks++; |
| } |
| |
| if (foundAppMaster) { |
| Assert.assertSame("Unexpected number of AM sylog* files", |
| sleepConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, 0) + 1, |
| sysSiblings.length); |
| Assert.assertTrue("AM syslog.1 length kb should be >= " + amLogKb, |
| sysSiblings[1].getLen() >= amLogKb * 1024); |
| } else { |
| Assert.assertSame("Unexpected number of MR task sylog* files", |
| sleepConf.getInt(MRJobConfig.TASK_LOG_BACKUPS, 0) + 1, |
| sysSiblings.length); |
| Assert.assertTrue("MR syslog.1 length kb should be >= " + userLogKb, |
| sysSiblings[1].getLen() >= userLogKb * 1024); |
| } |
| } |
| } |
| } |
| // Make sure we checked non-empty set |
| // |
| Assert.assertEquals("No AppMaster log found!", 1, numAppMasters); |
| if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) { |
| Assert.assertEquals("MapTask log with uber found!", 0, numMapTasks); |
| } else { |
| Assert.assertEquals("No MapTask log found!", 1, numMapTasks); |
| } |
| } |
| |
| public static class DistributedCacheChecker extends |
| Mapper<LongWritable, Text, NullWritable, NullWritable> { |
| |
| @Override |
| public void setup(Context context) throws IOException { |
| Configuration conf = context.getConfiguration(); |
| Path[] localFiles = context.getLocalCacheFiles(); |
| URI[] files = context.getCacheFiles(); |
| Path[] localArchives = context.getLocalCacheArchives(); |
| URI[] archives = context.getCacheArchives(); |
| |
| // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files |
| // and 2 archives are present |
| Assert.assertEquals(4, localFiles.length); |
| Assert.assertEquals(4, files.length); |
| Assert.assertEquals(2, localArchives.length); |
| Assert.assertEquals(2, archives.length); |
| |
| // Check lengths of the files |
| Map<String, Path> filesMap = pathsToMap(localFiles); |
| Assert.assertTrue(filesMap.containsKey("distributed.first.symlink")); |
| Assert.assertEquals(1, localFs.getFileStatus( |
| filesMap.get("distributed.first.symlink")).getLen()); |
| Assert.assertTrue(filesMap.containsKey("distributed.second.jar")); |
| Assert.assertTrue(localFs.getFileStatus( |
| filesMap.get("distributed.second.jar")).getLen() > 1); |
| |
| // Check extraction of the archive |
| Map<String, Path> archivesMap = pathsToMap(localArchives); |
| Assert.assertTrue(archivesMap.containsKey("distributed.third.jar")); |
| Assert.assertTrue(localFs.exists(new Path( |
| archivesMap.get("distributed.third.jar"), "distributed.jar.inside3"))); |
| Assert.assertTrue(archivesMap.containsKey("distributed.fourth.jar")); |
| Assert.assertTrue(localFs.exists(new Path( |
| archivesMap.get("distributed.fourth.jar"), "distributed.jar.inside4"))); |
| |
| // Check the class loaders |
| LOG.info("Java Classpath: " + System.getProperty("java.class.path")); |
| ClassLoader cl = Thread.currentThread().getContextClassLoader(); |
| // Both the file and the archive should have been added to classpath, so |
| // both should be reachable via the class loader. |
| Assert.assertNotNull(cl.getResource("distributed.jar.inside2")); |
| Assert.assertNotNull(cl.getResource("distributed.jar.inside3")); |
| Assert.assertNotNull(cl.getResource("distributed.jar.inside4")); |
| // The Job Jar should have been extracted to a folder named "job.jar" and |
| // added to the classpath; the two jar files in the lib folder in the Job |
| // Jar should have also been added to the classpath |
| Assert.assertNotNull(cl.getResource("job.jar/")); |
| Assert.assertNotNull(cl.getResource("job.jar/lib/lib1.jar")); |
| Assert.assertNotNull(cl.getResource("job.jar/lib/lib2.jar")); |
| |
| // Check that the symlink for the renaming was created in the cwd; |
| File symlinkFile = new File("distributed.first.symlink"); |
| Assert.assertTrue(symlinkFile.exists()); |
| Assert.assertEquals(1, symlinkFile.length()); |
| |
| // Check that the symlink for the Job Jar was created in the cwd and |
| // points to the extracted directory |
| File jobJarDir = new File("job.jar"); |
| if (Shell.WINDOWS) { |
| Assert.assertTrue(isWindowsSymlinkedDirectory(jobJarDir)); |
| } else { |
| Assert.assertTrue(FileUtils.isSymlink(jobJarDir)); |
| Assert.assertTrue(jobJarDir.isDirectory()); |
| } |
| } |
| |
| /** |
| * Used on Windows to determine if the specified file is a symlink that |
| * targets a directory. On most platforms, these checks can be done using |
| * commons-io. On Windows, the commons-io implementation is unreliable and |
| * always returns false. Instead, this method checks the output of the dir |
| * command. After migrating to Java 7, this method can be removed in favor |
| * of the new method java.nio.file.Files.isSymbolicLink, which is expected to |
| * work cross-platform. |
| * |
| * @param file File to check |
| * @return boolean true if the file is a symlink that targets a directory |
| * @throws IOException thrown for any I/O error |
| */ |
| private static boolean isWindowsSymlinkedDirectory(File file) |
| throws IOException { |
| String dirOut = Shell.execCommand("cmd", "/c", "dir", |
| file.getAbsoluteFile().getParent()); |
| StringReader sr = new StringReader(dirOut); |
| BufferedReader br = new BufferedReader(sr); |
| try { |
| String line = br.readLine(); |
| while (line != null) { |
| line = br.readLine(); |
| if (line.contains(file.getName()) && line.contains("<SYMLINKD>")) { |
| return true; |
| } |
| } |
| return false; |
| } finally { |
| IOUtils.closeStream(br); |
| IOUtils.closeStream(sr); |
| } |
| } |
| |
| /** |
| * Returns a mapping of the final component of each path to the corresponding |
| * Path instance. This assumes that every given Path has a unique string in |
| * the final path component, which is true for these tests. |
| * |
| * @param paths Path[] to map |
| * @return Map<String, Path> mapping the final component of each path to the |
| * corresponding Path instance |
| */ |
| private static Map<String, Path> pathsToMap(Path[] paths) { |
| Map<String, Path> map = new HashMap<String, Path>(); |
| for (Path path: paths) { |
| map.put(path.getName(), path); |
| } |
| return map; |
| } |
| } |
| |
| private void testDistributedCache(String jobJarPath, boolean withWildcard) |
| throws Exception { |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| // Create a temporary file of length 1. |
| Path first = createTempFile("distributed.first", "x"); |
| // Create three jars with a single file inside them. |
| Path second = |
| makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2); |
| Path third = |
| makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3); |
| Path fourth = |
| makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); |
| |
| Job job = Job.getInstance(mrCluster.getConfig()); |
| |
| // Set the job jar to a new "dummy" jar so we can check that its extracted |
| // properly |
| job.setJar(jobJarPath); |
| |
| if (withWildcard) { |
| // If testing with wildcards, upload the DistributedCacheChecker into HDFS |
| // and add the directory as a wildcard. |
| Path libs = new Path("testLibs"); |
| Path wildcard = remoteFs.makeQualified(new Path(libs, "*")); |
| |
| remoteFs.mkdirs(libs); |
| remoteFs.copyFromLocalFile(third, libs); |
| job.addCacheFile(wildcard.toUri()); |
| } else { |
| // Otherwise add the DistributedCacheChecker directly to the classpath. |
| // Because the job jar is a "dummy" jar, we need to include the jar with |
| // DistributedCacheChecker or it won't be able to find it |
| Path distributedCacheCheckerJar = new Path( |
| JarFinder.getJar(DistributedCacheChecker.class)); |
| job.addFileToClassPath(localFs.makeQualified(distributedCacheCheckerJar)); |
| } |
| |
| job.setMapperClass(DistributedCacheChecker.class); |
| job.setOutputFormatClass(NullOutputFormat.class); |
| |
| FileInputFormat.setInputPaths(job, first); |
| // Creates the Job Configuration |
| job.addCacheFile( |
| new URI(first.toUri().toString() + "#distributed.first.symlink")); |
| job.addFileToClassPath(second); |
| // The AppMaster jar itself |
| job.addFileToClassPath( |
| APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); |
| job.addArchiveToClassPath(third); |
| job.addCacheArchive(fourth.toUri()); |
| job.setMaxMapAttempts(1); // speed up failures |
| |
| job.submit(); |
| String trackingUrl = job.getTrackingURL(); |
| String jobId = job.getJobID().toString(); |
| Assert.assertTrue(job.waitForCompletion(false)); |
| Assert.assertTrue("Tracking URL was " + trackingUrl + |
| " but didn't Match Job ID " + jobId , |
| trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); |
| } |
| |
| private void testDistributedCache(boolean withWildcard) throws Exception { |
| // Test with a local (file:///) Job Jar |
| Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); |
| testDistributedCache(localJobJarPath.toUri().toString(), withWildcard); |
| |
| // Test with a remote (hdfs://) Job Jar |
| Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/", |
| localJobJarPath.getName()); |
| remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath); |
| File localJobJarFile = new File(localJobJarPath.toUri().toString()); |
| if (localJobJarFile.exists()) { // just to make sure |
| localJobJarFile.delete(); |
| } |
| testDistributedCache(remoteJobJarPath.toUri().toString(), withWildcard); |
| } |
| |
| @Test (timeout = 300000) |
| public void testDistributedCache() throws Exception { |
| testDistributedCache(false); |
| } |
| |
| @Test (timeout = 300000) |
| public void testDistributedCacheWithWildcards() throws Exception { |
| testDistributedCache(true); |
| } |
| |
| @Test(timeout = 120000) |
| public void testThreadDumpOnTaskTimeout() throws IOException, |
| InterruptedException, ClassNotFoundException { |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| final SleepJob sleepJob = new SleepJob(); |
| final JobConf sleepConf = new JobConf(mrCluster.getConfig()); |
| sleepConf.setLong(MRJobConfig.TASK_TIMEOUT, 3 * 1000L); |
| sleepConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1); |
| sleepJob.setConf(sleepConf); |
| if (this instanceof TestUberAM) { |
| sleepConf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, |
| 30 * 1000); |
| } |
| // sleep for 10 seconds to trigger a kill with thread dump |
| final Job job = sleepJob.createJob(1, 0, 10 * 60 * 1000L, 1, 0L, 0); |
| job.setJarByClass(SleepJob.class); |
| job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. |
| job.waitForCompletion(true); |
| final JobId jobId = TypeConverter.toYarn(job.getJobID()); |
| final ApplicationId appID = jobId.getAppId(); |
| int pollElapsed = 0; |
| while (true) { |
| Thread.sleep(1000); |
| pollElapsed += 1000; |
| if (TERMINAL_RM_APP_STATES.contains(mrCluster.getResourceManager() |
| .getRMContext().getRMApps().get(appID).getState())) { |
| break; |
| } |
| if (pollElapsed >= 60000) { |
| LOG.warn("application did not reach terminal state within 60 seconds"); |
| break; |
| } |
| } |
| |
| // Job finished, verify logs |
| // |
| |
| final String appIdStr = appID.toString(); |
| final String appIdSuffix = appIdStr.substring("application_".length(), |
| appIdStr.length()); |
| final String containerGlob = "container_" + appIdSuffix + "_*_*"; |
| final String syslogGlob = appIdStr |
| + Path.SEPARATOR + containerGlob |
| + Path.SEPARATOR + TaskLog.LogName.SYSLOG; |
| int numAppMasters = 0; |
| int numMapTasks = 0; |
| |
| for (int i = 0; i < NUM_NODE_MGRS; i++) { |
| final Configuration nmConf = mrCluster.getNodeManager(i).getConfig(); |
| for (String logDir : |
| nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) { |
| final Path absSyslogGlob = |
| new Path(logDir + Path.SEPARATOR + syslogGlob); |
| LOG.info("Checking for glob: " + absSyslogGlob); |
| for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) { |
| boolean foundAppMaster = false; |
| boolean foundThreadDump = false; |
| |
| // Determine the container type |
| final BufferedReader syslogReader = new BufferedReader( |
| new InputStreamReader(localFs.open(syslog.getPath()))); |
| try { |
| for (String line; (line = syslogReader.readLine()) != null; ) { |
| if (line.contains(MRAppMaster.class.getName())) { |
| foundAppMaster = true; |
| break; |
| } |
| } |
| } finally { |
| syslogReader.close(); |
| } |
| |
| // Check for thread dump in stdout |
| final Path stdoutPath = new Path(syslog.getPath().getParent(), |
| TaskLog.LogName.STDOUT.toString()); |
| final BufferedReader stdoutReader = new BufferedReader( |
| new InputStreamReader(localFs.open(stdoutPath))); |
| try { |
| for (String line; (line = stdoutReader.readLine()) != null; ) { |
| if (line.contains("Full thread dump")) { |
| foundThreadDump = true; |
| break; |
| } |
| } |
| } finally { |
| stdoutReader.close(); |
| } |
| |
| if (foundAppMaster) { |
| numAppMasters++; |
| if (this instanceof TestUberAM) { |
| Assert.assertTrue("No thread dump", foundThreadDump); |
| } else { |
| Assert.assertFalse("Unexpected thread dump", foundThreadDump); |
| } |
| } else { |
| numMapTasks++; |
| Assert.assertTrue("No thread dump", foundThreadDump); |
| } |
| } |
| } |
| } |
| |
| // Make sure we checked non-empty set |
| // |
| Assert.assertEquals("No AppMaster log found!", 1, numAppMasters); |
| if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) { |
| Assert.assertSame("MapTask log with uber found!", 0, numMapTasks); |
| } else { |
| Assert.assertSame("No MapTask log found!", 1, numMapTasks); |
| } |
| } |
| |
| private Path createTempFile(String filename, String contents) |
| throws IOException { |
| Path path = new Path(TEST_ROOT_DIR, filename); |
| FSDataOutputStream os = localFs.create(path); |
| os.writeBytes(contents); |
| os.close(); |
| localFs.setPermission(path, new FsPermission("700")); |
| return path; |
| } |
| |
| private Path makeJar(Path p, int index) throws FileNotFoundException, |
| IOException { |
| FileOutputStream fos = |
| new FileOutputStream(new File(p.toUri().getPath())); |
| JarOutputStream jos = new JarOutputStream(fos); |
| ZipEntry ze = new ZipEntry("distributed.jar.inside" + index); |
| jos.putNextEntry(ze); |
| jos.write(("inside the jar!" + index).getBytes()); |
| jos.closeEntry(); |
| jos.close(); |
| localFs.setPermission(p, new FsPermission("700")); |
| return p; |
| } |
| |
| private Path makeJobJarWithLib(String testDir) throws FileNotFoundException, |
| IOException{ |
| Path jobJarPath = new Path(testDir, "thejob.jar"); |
| FileOutputStream fos = |
| new FileOutputStream(new File(jobJarPath.toUri().getPath())); |
| JarOutputStream jos = new JarOutputStream(fos); |
| // Have to put in real jar files or it will complain |
| createAndAddJarToJar(jos, new File( |
| new Path(testDir, "lib1.jar").toUri().getPath())); |
| createAndAddJarToJar(jos, new File( |
| new Path(testDir, "lib2.jar").toUri().getPath())); |
| jos.close(); |
| localFs.setPermission(jobJarPath, new FsPermission("700")); |
| return jobJarPath; |
| } |
| |
| private void createAndAddJarToJar(JarOutputStream jos, File jarFile) |
| throws FileNotFoundException, IOException { |
| FileOutputStream fos2 = new FileOutputStream(jarFile); |
| JarOutputStream jos2 = new JarOutputStream(fos2); |
| // Have to have at least one entry or it will complain |
| ZipEntry ze = new ZipEntry("lib1.inside"); |
| jos2.putNextEntry(ze); |
| jos2.closeEntry(); |
| jos2.close(); |
| ze = new ZipEntry("lib/" + jarFile.getName()); |
| jos.putNextEntry(ze); |
| FileInputStream in = new FileInputStream(jarFile); |
| byte buf[] = new byte[1024]; |
| int numRead; |
| do { |
| numRead = in.read(buf); |
| if (numRead >= 0) { |
| jos.write(buf, 0, numRead); |
| } |
| } while (numRead != -1); |
| in.close(); |
| jos.closeEntry(); |
| jarFile.delete(); |
| } |
| |
| @Test |
| public void testSharedCache() throws Exception { |
| Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); |
| |
| if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { |
| LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR |
| + " not found. Not running test."); |
| return; |
| } |
| |
| Job job = Job.getInstance(mrCluster.getConfig()); |
| |
| Configuration jobConf = job.getConfiguration(); |
| jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); |
| |
| Path inputFile = createTempFile("input-file", "x"); |
| |
| // Create jars with a single file inside them. |
| Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2); |
| Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3); |
| Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); |
| |
| // Add libjars to job conf |
| jobConf.set("tmpjars", second.toString() + "," + third.toString() + "," |
| + fourth.toString()); |
| |
| // Because the job jar is a "dummy" jar, we need to include the jar with |
| // DistributedCacheChecker or it won't be able to find it |
| Path distributedCacheCheckerJar = |
| new Path(JarFinder.getJar(SharedCacheChecker.class)); |
| job.addFileToClassPath(distributedCacheCheckerJar.makeQualified( |
| localFs.getUri(), distributedCacheCheckerJar.getParent())); |
| |
| job.setMapperClass(SharedCacheChecker.class); |
| job.setOutputFormatClass(NullOutputFormat.class); |
| |
| FileInputFormat.setInputPaths(job, inputFile); |
| |
| job.setMaxMapAttempts(1); // speed up failures |
| |
| job.submit(); |
| String trackingUrl = job.getTrackingURL(); |
| String jobId = job.getJobID().toString(); |
| Assert.assertTrue(job.waitForCompletion(true)); |
| Assert.assertTrue("Tracking URL was " + trackingUrl |
| + " but didn't Match Job ID " + jobId, |
| trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); |
| } |
| |
| /** |
| * An identity mapper for testing the shared cache. |
| */ |
| public static class SharedCacheChecker extends |
| Mapper<LongWritable, Text, NullWritable, NullWritable> { |
| @Override |
| public void setup(Context context) throws IOException { |
| } |
| } |
| |
| public static class ConfVerificationMapper extends SleepMapper { |
| @Override |
| protected void setup(Context context) |
| throws IOException, InterruptedException { |
| super.setup(context); |
| final Configuration conf = context.getConfiguration(); |
| // check if the job classloader is enabled and verify the TCCL |
| if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { |
| ClassLoader tccl = Thread.currentThread().getContextClassLoader(); |
| if (!(tccl instanceof ApplicationClassLoader)) { |
| throw new IOException("TCCL expected: " + |
| ApplicationClassLoader.class.getName() + ", actual: " + |
| tccl.getClass().getName()); |
| } |
| } |
| final String ioSortMb = conf.get(MRJobConfig.IO_SORT_MB); |
| if (!TEST_IO_SORT_MB.equals(ioSortMb)) { |
| throw new IOException("io.sort.mb expected: " + TEST_IO_SORT_MB |
| + ", actual: " + ioSortMb); |
| } |
| } |
| } |
| |
| @Test |
| public void testSleepJobName() throws IOException { |
| SleepJob sleepJob = new SleepJob(); |
| sleepJob.setConf(conf); |
| |
| Job job1 = sleepJob.createJob(1, 1, 1, 1, 1, 1); |
| assertThat(job1.getJobName()) |
| .withFailMessage("Wrong default name of sleep job.") |
| .isEqualTo(SleepJob.SLEEP_JOB_NAME); |
| |
| String expectedJob2Name = SleepJob.SLEEP_JOB_NAME + " - test"; |
| Job job2 = sleepJob.createJob(1, 1, 1, 1, 1, 1, "test"); |
| assertThat(job2.getJobName()) |
| .withFailMessage("Wrong name of sleep job.") |
| .isEqualTo(expectedJob2Name); |
| } |
| } |