| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred.gridmix; |
| |
| import static org.junit.Assert.*; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.MapContext; |
| import org.apache.hadoop.mapreduce.MapReduceTestUtil; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
| import org.apache.hadoop.mapreduce.task.MapContextImpl; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * Validate emulation of distributed cache load in gridmix simulated jobs. |
| * |
| */ |
| public class TestDistCacheEmulation { |
| |
| private DistributedCacheEmulator dce = null; |
| |
| @BeforeClass |
| public static void init() throws IOException { |
| GridmixTestUtils.initCluster(); |
| } |
| |
| @AfterClass |
| public static void shutDown() throws IOException { |
| GridmixTestUtils.shutdownCluster(); |
| } |
| |
| /** |
| * Validate the dist cache files generated by GenerateDistCacheData job. |
| * @param jobConf configuration of GenerateDistCacheData job. |
| * @param sortedFileSizes array of sorted distributed cache file sizes |
| * @throws IOException |
| * @throws FileNotFoundException |
| */ |
| private void validateDistCacheData(JobConf jobConf, long[] sortedFileSizes) |
| throws FileNotFoundException, IOException { |
| Path distCachePath = dce.getDistributedCacheDir(); |
| String filesListFile = |
| jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST); |
| FileSystem fs = FileSystem.get(jobConf); |
| |
| // Validate the existence of Distributed Cache files list file directly |
| // under distributed cache directory |
| Path listFile = new Path(filesListFile); |
| assertTrue("Path of Distributed Cache files list file is wrong.", |
| distCachePath.equals(listFile.getParent().makeQualified(fs))); |
| |
| // Delete the dist cache files list file |
| assertTrue("Failed to delete distributed Cache files list file " + listFile, |
| fs.delete(listFile)); |
| |
| List<Long> fileSizes = new ArrayList<Long>(); |
| for (long size : sortedFileSizes) { |
| fileSizes.add(size); |
| } |
| // validate dist cache files after deleting the 'files list file' |
| validateDistCacheFiles(fileSizes, distCachePath); |
| } |
| |
| /** |
| * Validate private/public distributed cache files. |
| * @param filesSizesExpected list of sizes of expected dist cache files |
| * @param distCacheDir the distributed cache dir to be validated |
| * @throws IOException |
| * @throws FileNotFoundException |
| */ |
| private void validateDistCacheFiles(List filesSizesExpected, |
| Path distCacheDir) throws FileNotFoundException, IOException { |
| RemoteIterator<LocatedFileStatus> iter = |
| GridmixTestUtils.dfs.listFiles(distCacheDir, false); |
| int numFiles = filesSizesExpected.size(); |
| for (int i = 0; i < numFiles; i++) { |
| assertTrue("Missing distributed cache files.", iter.hasNext()); |
| LocatedFileStatus stat = iter.next(); |
| assertTrue("File size of distributed cache file " |
| + stat.getPath().toUri().getPath() + " is wrong.", |
| filesSizesExpected.remove(stat.getLen())); |
| |
| FsPermission perm = stat.getPermission(); |
| assertEquals("Wrong permissions for distributed cache file " |
| + stat.getPath().toUri().getPath(), |
| new FsPermission((short)0644), perm); |
| } |
| assertFalse("Number of files under distributed cache dir is wrong.", |
| iter.hasNext()); |
| } |
| |
| /** |
| * Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache |
| * file in the given Configuration object <code>conf</code>. |
| * @param conf configuration where dist cache config properties are to be set |
| * @param useOldProperties <code>true</code> if old config properties are to |
| * be set |
| * @return array of sorted HDFS-based distributed cache file sizes |
| * @throws IOException |
| */ |
| private long[] configureDummyDistCacheFiles(Configuration conf, |
| boolean useOldProperties) throws IOException { |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| conf.set(MRJobConfig.USER_NAME, user); |
| // Set some dummy dist cache files in gridmix configuration so that they go |
| // into the configuration of JobStory objects. |
| String[] distCacheFiles = {"hdfs:///tmp/file1.txt", |
| "/tmp/" + user + "/.staging/job_1/file2.txt", |
| "hdfs:///user/user1/file3.txt", |
| "/home/user2/file4.txt", |
| "subdir1/file5.txt", |
| "subdir2/file6.gz"}; |
| String[] fileSizes = {"400", "2500", "700", "1200", "1500", "500"}; |
| |
| String[] visibilities = {"true", "false", "false", "true", "true", "false"}; |
| String[] timeStamps = {"1234", "2345", "34567", "5434", "125", "134"}; |
| if (useOldProperties) { |
| conf.setStrings("mapred.cache.files", distCacheFiles); |
| conf.setStrings("mapred.cache.files.filesizes", fileSizes); |
| conf.setStrings("mapred.cache.files.visibilities", visibilities); |
| conf.setStrings("mapred.cache.files.timestamps", timeStamps); |
| } else { |
| conf.setStrings(MRJobConfig.CACHE_FILES, distCacheFiles); |
| conf.setStrings(MRJobConfig.CACHE_FILES_SIZES, fileSizes); |
| conf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, visibilities); |
| conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timeStamps); |
| } |
| // local FS based dist cache file whose path contains <user>/.staging is |
| // not created on HDFS. So file size 2500 is not added to sortedFileSizes. |
| long[] sortedFileSizes = new long[] {1500, 1200, 700, 500, 400}; |
| return sortedFileSizes; |
| } |
| |
| /** |
| * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and |
| * and returns the jobConf. Fills the array <code>sortedFileSizes</code> that |
| * can be used for validation. |
| * Validation of exit code from setupGenerateDistCacheData() is done. |
| * @param generate true if -generate option is specified |
| * @param sortedFileSizes sorted HDFS-based distributed cache file sizes |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| private JobConf runSetupGenerateDistCacheData(boolean generate, |
| long[] sortedFileSizes) throws IOException, InterruptedException { |
| Configuration conf = new Configuration(); |
| long[] fileSizes = configureDummyDistCacheFiles(conf, false); |
| System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length); |
| |
| // Job stories of all 3 jobs will have same dist cache files in their |
| // configurations |
| final int numJobs = 3; |
| DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf); |
| |
| JobConf jobConf = |
| GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf)); |
| Path ioPath = new Path("testSetupGenerateDistCacheData") |
| .makeQualified(GridmixTestUtils.dfs); |
| FileSystem fs = FileSystem.get(jobConf); |
| if (fs.exists(ioPath)) { |
| fs.delete(ioPath, true); |
| } |
| FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777)); |
| |
| dce = createDistributedCacheEmulator(jobConf, ioPath, generate); |
| int exitCode = dce.setupGenerateDistCacheData(jobProducer); |
| int expectedExitCode = generate ? 0 : dce.MISSING_DIST_CACHE_FILES_ERROR; |
| assertEquals("setupGenerateDistCacheData failed.", |
| expectedExitCode, exitCode); |
| |
| // reset back |
| resetDistCacheConfigProperties(jobConf); |
| return jobConf; |
| } |
| |
| /** |
| * Reset the config properties related to Distributed Cache in the given |
| * job configuration <code>jobConf</code>. |
| * @param jobConf job configuration |
| */ |
| private void resetDistCacheConfigProperties(JobConf jobConf) { |
| // reset current/latest property names |
| jobConf.setStrings(MRJobConfig.CACHE_FILES, ""); |
| jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, ""); |
| jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, ""); |
| jobConf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, ""); |
| // reset old property names |
| jobConf.setStrings("mapred.cache.files", ""); |
| jobConf.setStrings("mapred.cache.files.filesizes", ""); |
| jobConf.setStrings("mapred.cache.files.visibilities", ""); |
| jobConf.setStrings("mapred.cache.files.timestamps", ""); |
| } |
| |
| /** |
| * Validate GenerateDistCacheData job if it creates dist cache files properly. |
| * @throws Exception |
| */ |
| @Test |
| public void testGenerateDistCacheData() throws Exception { |
| long[] sortedFileSizes = new long[5]; |
| JobConf jobConf = |
| runSetupGenerateDistCacheData(true, sortedFileSizes); |
| GridmixJob gridmixJob = new GenerateDistCacheData(jobConf); |
| Job job = gridmixJob.call(); |
| assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.", |
| 0, job.getNumReduceTasks()); |
| assertTrue("GenerateDistCacheData job failed.", |
| job.waitForCompletion(false)); |
| validateDistCacheData(jobConf, sortedFileSizes); |
| } |
| |
| /** |
| * Validate setupGenerateDistCacheData by validating |
| * <li> permissions of the distributed cache directories and |
| * <li> content of the generated sequence file. This includes validation of |
| * dist cache file paths and their file sizes. |
| */ |
| private void validateSetupGenDC(JobConf jobConf, long[] sortedFileSizes) |
| throws IOException, InterruptedException { |
| // build things needed for validation |
| long sumOfFileSizes = 0; |
| for (int i = 0; i < sortedFileSizes.length; i++) { |
| sumOfFileSizes += sortedFileSizes[i]; |
| } |
| |
| FileSystem fs = FileSystem.get(jobConf); |
| assertEquals("Number of distributed cache files to be generated is wrong.", |
| sortedFileSizes.length, |
| jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1)); |
| assertEquals("Total size of dist cache files to be generated is wrong.", |
| sumOfFileSizes, jobConf.getLong( |
| GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1)); |
| Path filesListFile = new Path(jobConf.get( |
| GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST)); |
| FileStatus stat = fs.getFileStatus(filesListFile); |
| assertEquals("Wrong permissions of dist Cache files list file " |
| + filesListFile, new FsPermission((short)0644), stat.getPermission()); |
| |
| InputSplit split = |
| new FileSplit(filesListFile, 0, stat.getLen(), (String[])null); |
| TaskAttemptContext taskContext = |
| MapReduceTestUtil.createDummyMapTaskAttemptContext(jobConf); |
| RecordReader<LongWritable, BytesWritable> reader = |
| new GenerateDistCacheData.GenDCDataFormat().createRecordReader( |
| split, taskContext); |
| MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable> |
| mapContext = new MapContextImpl<LongWritable, BytesWritable, |
| NullWritable, BytesWritable>(jobConf, taskContext.getTaskAttemptID(), |
| reader, null, null, MapReduceTestUtil.createDummyReporter(), split); |
| reader.initialize(split, mapContext); |
| |
| // start validating setupGenerateDistCacheData |
| doValidateSetupGenDC(reader, fs, sortedFileSizes); |
| } |
| |
| /** |
| * Validate setupGenerateDistCacheData by validating |
| * <li> permissions of the distributed cache directory and |
| * <li> content of the generated sequence file. This includes validation of |
| * dist cache file paths and their file sizes. |
| */ |
| private void doValidateSetupGenDC(RecordReader<LongWritable, BytesWritable> |
| reader, FileSystem fs, long[] sortedFileSizes) |
| throws IOException, InterruptedException { |
| |
| // Validate permissions of dist cache directory |
| Path distCacheDir = dce.getDistributedCacheDir(); |
| assertEquals("Wrong permissions for distributed cache dir " + distCacheDir, |
| fs.getFileStatus(distCacheDir).getPermission() |
| .getOtherAction().and(FsAction.EXECUTE), FsAction.EXECUTE); |
| |
| // Validate the content of the sequence file generated by |
| // dce.setupGenerateDistCacheData(). |
| LongWritable key = new LongWritable(); |
| BytesWritable val = new BytesWritable(); |
| for (int i = 0; i < sortedFileSizes.length; i++) { |
| assertTrue("Number of files written to the sequence file by " |
| + "setupGenerateDistCacheData is less than the expected.", |
| reader.nextKeyValue()); |
| key = reader.getCurrentKey(); |
| val = reader.getCurrentValue(); |
| long fileSize = key.get(); |
| String file = new String(val.getBytes(), 0, val.getLength()); |
| |
| // Dist Cache files should be sorted based on file size. |
| assertEquals("Dist cache file size is wrong.", |
| sortedFileSizes[i], fileSize); |
| |
| // Validate dist cache file path. |
| |
| // parent dir of dist cache file |
| Path parent = new Path(file).getParent().makeQualified(fs); |
| // should exist in dist cache dir |
| assertTrue("Public dist cache file path is wrong.", |
| distCacheDir.equals(parent)); |
| } |
| } |
| |
| /** |
| * Test if DistributedCacheEmulator's setup of GenerateDistCacheData is |
| * working as expected. |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| @Test |
| public void testSetupGenerateDistCacheData() |
| throws IOException, InterruptedException { |
| long[] sortedFileSizes = new long[5]; |
| JobConf jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes); |
| validateSetupGenDC(jobConf, sortedFileSizes); |
| |
| // Verify if correct exit code is seen when -generate option is missing and |
| // distributed cache files are missing in the expected path. |
| runSetupGenerateDistCacheData(false, sortedFileSizes); |
| } |
| |
| /** |
| * Create DistributedCacheEmulator object and do the initialization by |
| * calling init() on it with dummy trace. Also configure the pseudo local FS. |
| */ |
| private DistributedCacheEmulator createDistributedCacheEmulator( |
| Configuration conf, Path ioPath, boolean generate) throws IOException { |
| DistributedCacheEmulator dce = |
| new DistributedCacheEmulator(conf, ioPath); |
| JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB); |
| jobCreator.setDistCacheEmulator(dce); |
| dce.init("dummytrace", jobCreator, generate); |
| return dce; |
| } |
| |
| /** |
| * Test the configuration property for disabling/enabling emulation of |
| * distributed cache load. |
| */ |
| @Test |
| public void testDistCacheEmulationConfigurability() throws IOException { |
| Configuration conf = new Configuration(); |
| JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf( |
| new JobConf(conf)); |
| Path ioPath = new Path("testDistCacheEmulationConfigurability") |
| .makeQualified(GridmixTestUtils.dfs); |
| FileSystem fs = FileSystem.get(jobConf); |
| FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777)); |
| |
| // default config |
| dce = createDistributedCacheEmulator(jobConf, ioPath, false); |
| assertTrue("Default configuration of " |
| + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE |
| + " is wrong.", dce.shouldEmulateDistCacheLoad()); |
| |
| // config property set to false |
| jobConf.setBoolean( |
| DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false); |
| dce = createDistributedCacheEmulator(jobConf, ioPath, false); |
| assertFalse("Disabling of emulation of distributed cache load by setting " |
| + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE |
| + " to false is not working.", dce.shouldEmulateDistCacheLoad()); |
| } |
| |
| /** |
| * Verify if DistributedCacheEmulator can configure distributed cache files |
| * for simulated job if job conf from trace had no dist cache files. |
| * @param conf configuration for the simulated job to be run |
| * @param jobConf job configuration of original cluster's job, obtained from |
| * trace |
| * @throws IOException |
| */ |
| private void validateJobConfWithOutDCFiles(Configuration conf, |
| JobConf jobConf) throws IOException { |
| // Validate if Gridmix can configure dist cache files properly if there are |
| // no HDFS-based dist cache files and localFS-based dist cache files in |
| // trace for a job. |
| dce.configureDistCacheFiles(conf, jobConf); |
| assertNull("Distributed cache files configured by GridMix is wrong.", |
| conf.get(MRJobConfig.CACHE_FILES)); |
| assertNull("Distributed cache files configured by Gridmix through -files " |
| + "option is wrong.", conf.get("tmpfiles")); |
| } |
| |
| /** |
| * Verify if DistributedCacheEmulator can configure distributed cache files |
| * for simulated job if job conf from trace had HDFS-based dist cache files |
| * and local-FS-based dist cache files. |
| * <br>Also validate if Gridmix can handle/read deprecated config properties |
| * like mapred.cache.files.filesizes and mapred.cache.files.visibilities from |
| * trace file. |
| * @param conf configuration for the simulated job to be run |
| * @param jobConf job configuration of original cluster's job, obtained from |
| * trace |
| * @throws IOException |
| */ |
| private void validateJobConfWithDCFiles(Configuration conf, |
| JobConf jobConf) throws IOException { |
| long[] sortedFileSizes = configureDummyDistCacheFiles(jobConf, true); |
| |
| // Validate if Gridmix can handle deprecated config properties like |
| // mapred.cache.files.filesizes and mapred.cache.files.visibilities. |
| // 1 local FS based dist cache file and 5 HDFS based dist cache files. So |
| // total expected dist cache files count is 6. |
| assertEquals("Gridmix is not able to extract dist cache file sizes.", |
| 6, jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES).length); |
| assertEquals("Gridmix is not able to extract dist cache file visibilities.", |
| 6, jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES).length); |
| |
| dce.configureDistCacheFiles(conf, jobConf); |
| |
| assertEquals("Configuring of HDFS-based dist cache files by gridmix is " |
| + "wrong.", sortedFileSizes.length, |
| conf.getStrings(MRJobConfig.CACHE_FILES).length); |
| assertEquals("Configuring of local-FS-based dist cache files by gridmix is " |
| + "wrong.", 1, conf.getStrings("tmpfiles").length); |
| } |
| |
| /** |
| * Verify if configureDistCacheFiles() works fine when there are distributed |
| * cache files set but visibilities are not set. This is to handle history |
| * traces of older hadoop version where there are no private/public |
| * Distributed Caches. |
| * @throws IOException |
| */ |
| private void validateWithOutVisibilities() throws IOException { |
| Configuration conf = new Configuration();// configuration for simulated job |
| JobConf jobConf = new JobConf(); |
| String user = "user1"; |
| jobConf.setUser(user); |
| String[] files = {"/tmp/hdfs1.txt", "/tmp/"+ user + "/.staging/file1"}; |
| jobConf.setStrings(MRJobConfig.CACHE_FILES, files); |
| jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "12,200"); |
| jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "56789,98345"); |
| dce.configureDistCacheFiles(conf, jobConf); |
| assertEquals("Configuring of HDFS-based dist cache files by gridmix is " |
| + "wrong.", files.length, |
| conf.getStrings(MRJobConfig.CACHE_FILES).length); |
| assertNull("Configuring of local-FS-based dist cache files by gridmix is " |
| + "wrong.", conf.get("tmpfiles")); |
| } |
| |
| /** |
| * Test if Gridmix can configure config properties related to Distributed |
| * Cache properly. Also verify if Gridmix can handle deprecated config |
| * properties related to Distributed Cache. |
| * @throws IOException |
| */ |
| @Test |
| public void testDistCacheFilesConfiguration() throws IOException { |
| Configuration conf = new Configuration(); |
| JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf( |
| new JobConf(conf)); |
| Path ioPath = new Path("testDistCacheEmulationConfigurability") |
| .makeQualified(GridmixTestUtils.dfs); |
| FileSystem fs = FileSystem.get(jobConf); |
| FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777)); |
| |
| // default config |
| dce = createDistributedCacheEmulator(jobConf, ioPath, false); |
| assertTrue("Default configuration of " |
| + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE |
| + " is wrong.", dce.shouldEmulateDistCacheLoad()); |
| |
| // Validate if DistributedCacheEmulator can handle a JobStory with out |
| // Distributed Cache files properly. |
| validateJobConfWithOutDCFiles(conf, jobConf); |
| |
| // Validate if Gridmix can configure dist cache files properly if there are |
| // HDFS-based dist cache files and localFS-based dist cache files in trace |
| // for a job. Set old config properties and validate. |
| validateJobConfWithDCFiles(conf, jobConf); |
| |
| // Use new JobConf as JobStory conf and check if configureDistCacheFiles() |
| // doesn't throw NPE when there are dist cache files set but visibilities |
| // are not set. |
| validateWithOutVisibilities(); |
| } |
| } |