blob: 163cc8073bff849d20c0ad21d88792725d8a5027 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import static org.junit.Assert.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Validate emulation of distributed cache load in gridmix simulated jobs.
*
*/
public class TestDistCacheEmulation {
private DistributedCacheEmulator dce = null;
@BeforeClass
public static void init() throws IOException {
GridmixTestUtils.initCluster();
}
@AfterClass
public static void shutDown() throws IOException {
GridmixTestUtils.shutdownCluster();
}
/**
* Validate the dist cache files generated by GenerateDistCacheData job.
* @param jobConf configuration of GenerateDistCacheData job.
* @param sortedFileSizes array of sorted distributed cache file sizes
* @throws IOException
* @throws FileNotFoundException
*/
private void validateDistCacheData(JobConf jobConf, long[] sortedFileSizes)
throws FileNotFoundException, IOException {
Path distCachePath = dce.getDistributedCacheDir();
String filesListFile =
jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST);
FileSystem fs = FileSystem.get(jobConf);
// Validate the existence of Distributed Cache files list file directly
// under distributed cache directory
Path listFile = new Path(filesListFile);
assertTrue("Path of Distributed Cache files list file is wrong.",
distCachePath.equals(listFile.getParent().makeQualified(fs)));
// Delete the dist cache files list file
assertTrue("Failed to delete distributed Cache files list file " + listFile,
fs.delete(listFile));
List<Long> fileSizes = new ArrayList<Long>();
for (long size : sortedFileSizes) {
fileSizes.add(size);
}
// validate dist cache files after deleting the 'files list file'
validateDistCacheFiles(fileSizes, distCachePath);
}
/**
* Validate private/public distributed cache files.
* @param filesSizesExpected list of sizes of expected dist cache files
* @param distCacheDir the distributed cache dir to be validated
* @throws IOException
* @throws FileNotFoundException
*/
private void validateDistCacheFiles(List filesSizesExpected,
Path distCacheDir) throws FileNotFoundException, IOException {
RemoteIterator<LocatedFileStatus> iter =
GridmixTestUtils.dfs.listFiles(distCacheDir, false);
int numFiles = filesSizesExpected.size();
for (int i = 0; i < numFiles; i++) {
assertTrue("Missing distributed cache files.", iter.hasNext());
LocatedFileStatus stat = iter.next();
assertTrue("File size of distributed cache file "
+ stat.getPath().toUri().getPath() + " is wrong.",
filesSizesExpected.remove(stat.getLen()));
FsPermission perm = stat.getPermission();
assertEquals("Wrong permissions for distributed cache file "
+ stat.getPath().toUri().getPath(),
new FsPermission((short)0644), perm);
}
assertFalse("Number of files under distributed cache dir is wrong.",
iter.hasNext());
}
/**
* Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache
* file in the given Configuration object <code>conf</code>.
* @param conf configuration where dist cache config properties are to be set
* @param useOldProperties <code>true</code> if old config properties are to
* be set
* @return array of sorted HDFS-based distributed cache file sizes
* @throws IOException
*/
private long[] configureDummyDistCacheFiles(Configuration conf,
boolean useOldProperties) throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
conf.set(MRJobConfig.USER_NAME, user);
// Set some dummy dist cache files in gridmix configuration so that they go
// into the configuration of JobStory objects.
String[] distCacheFiles = {"hdfs:///tmp/file1.txt",
"/tmp/" + user + "/.staging/job_1/file2.txt",
"hdfs:///user/user1/file3.txt",
"/home/user2/file4.txt",
"subdir1/file5.txt",
"subdir2/file6.gz"};
String[] fileSizes = {"400", "2500", "700", "1200", "1500", "500"};
String[] visibilities = {"true", "false", "false", "true", "true", "false"};
String[] timeStamps = {"1234", "2345", "34567", "5434", "125", "134"};
if (useOldProperties) {
conf.setStrings("mapred.cache.files", distCacheFiles);
conf.setStrings("mapred.cache.files.filesizes", fileSizes);
conf.setStrings("mapred.cache.files.visibilities", visibilities);
conf.setStrings("mapred.cache.files.timestamps", timeStamps);
} else {
conf.setStrings(MRJobConfig.CACHE_FILES, distCacheFiles);
conf.setStrings(MRJobConfig.CACHE_FILES_SIZES, fileSizes);
conf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, visibilities);
conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timeStamps);
}
// local FS based dist cache file whose path contains <user>/.staging is
// not created on HDFS. So file size 2500 is not added to sortedFileSizes.
long[] sortedFileSizes = new long[] {1500, 1200, 700, 500, 400};
return sortedFileSizes;
}
/**
* Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and
* and returns the jobConf. Fills the array <code>sortedFileSizes</code> that
* can be used for validation.
* Validation of exit code from setupGenerateDistCacheData() is done.
* @param generate true if -generate option is specified
* @param sortedFileSizes sorted HDFS-based distributed cache file sizes
* @throws IOException
* @throws InterruptedException
*/
private JobConf runSetupGenerateDistCacheData(boolean generate,
long[] sortedFileSizes) throws IOException, InterruptedException {
Configuration conf = new Configuration();
long[] fileSizes = configureDummyDistCacheFiles(conf, false);
System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length);
// Job stories of all 3 jobs will have same dist cache files in their
// configurations
final int numJobs = 3;
DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf);
JobConf jobConf =
GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
Path ioPath = new Path("testSetupGenerateDistCacheData")
.makeQualified(GridmixTestUtils.dfs);
FileSystem fs = FileSystem.get(jobConf);
if (fs.exists(ioPath)) {
fs.delete(ioPath, true);
}
FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
dce = createDistributedCacheEmulator(jobConf, ioPath, generate);
int exitCode = dce.setupGenerateDistCacheData(jobProducer);
int expectedExitCode = generate ? 0 : dce.MISSING_DIST_CACHE_FILES_ERROR;
assertEquals("setupGenerateDistCacheData failed.",
expectedExitCode, exitCode);
// reset back
resetDistCacheConfigProperties(jobConf);
return jobConf;
}
/**
* Reset the config properties related to Distributed Cache in the given
* job configuration <code>jobConf</code>.
* @param jobConf job configuration
*/
private void resetDistCacheConfigProperties(JobConf jobConf) {
// reset current/latest property names
jobConf.setStrings(MRJobConfig.CACHE_FILES, "");
jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "");
jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "");
jobConf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, "");
// reset old property names
jobConf.setStrings("mapred.cache.files", "");
jobConf.setStrings("mapred.cache.files.filesizes", "");
jobConf.setStrings("mapred.cache.files.visibilities", "");
jobConf.setStrings("mapred.cache.files.timestamps", "");
}
/**
* Validate GenerateDistCacheData job if it creates dist cache files properly.
* @throws Exception
*/
@Test
public void testGenerateDistCacheData() throws Exception {
long[] sortedFileSizes = new long[5];
JobConf jobConf =
runSetupGenerateDistCacheData(true, sortedFileSizes);
GridmixJob gridmixJob = new GenerateDistCacheData(jobConf);
Job job = gridmixJob.call();
assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.",
0, job.getNumReduceTasks());
assertTrue("GenerateDistCacheData job failed.",
job.waitForCompletion(false));
validateDistCacheData(jobConf, sortedFileSizes);
}
/**
* Validate setupGenerateDistCacheData by validating
* <li> permissions of the distributed cache directories and
* <li> content of the generated sequence file. This includes validation of
* dist cache file paths and their file sizes.
*/
private void validateSetupGenDC(JobConf jobConf, long[] sortedFileSizes)
throws IOException, InterruptedException {
// build things needed for validation
long sumOfFileSizes = 0;
for (int i = 0; i < sortedFileSizes.length; i++) {
sumOfFileSizes += sortedFileSizes[i];
}
FileSystem fs = FileSystem.get(jobConf);
assertEquals("Number of distributed cache files to be generated is wrong.",
sortedFileSizes.length,
jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
assertEquals("Total size of dist cache files to be generated is wrong.",
sumOfFileSizes, jobConf.getLong(
GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
Path filesListFile = new Path(jobConf.get(
GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
FileStatus stat = fs.getFileStatus(filesListFile);
assertEquals("Wrong permissions of dist Cache files list file "
+ filesListFile, new FsPermission((short)0644), stat.getPermission());
InputSplit split =
new FileSplit(filesListFile, 0, stat.getLen(), (String[])null);
TaskAttemptContext taskContext =
MapReduceTestUtil.createDummyMapTaskAttemptContext(jobConf);
RecordReader<LongWritable, BytesWritable> reader =
new GenerateDistCacheData.GenDCDataFormat().createRecordReader(
split, taskContext);
MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable>
mapContext = new MapContextImpl<LongWritable, BytesWritable,
NullWritable, BytesWritable>(jobConf, taskContext.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mapContext);
// start validating setupGenerateDistCacheData
doValidateSetupGenDC(reader, fs, sortedFileSizes);
}
/**
* Validate setupGenerateDistCacheData by validating
* <li> permissions of the distributed cache directory and
* <li> content of the generated sequence file. This includes validation of
* dist cache file paths and their file sizes.
*/
private void doValidateSetupGenDC(RecordReader<LongWritable, BytesWritable>
reader, FileSystem fs, long[] sortedFileSizes)
throws IOException, InterruptedException {
// Validate permissions of dist cache directory
Path distCacheDir = dce.getDistributedCacheDir();
assertEquals("Wrong permissions for distributed cache dir " + distCacheDir,
fs.getFileStatus(distCacheDir).getPermission()
.getOtherAction().and(FsAction.EXECUTE), FsAction.EXECUTE);
// Validate the content of the sequence file generated by
// dce.setupGenerateDistCacheData().
LongWritable key = new LongWritable();
BytesWritable val = new BytesWritable();
for (int i = 0; i < sortedFileSizes.length; i++) {
assertTrue("Number of files written to the sequence file by "
+ "setupGenerateDistCacheData is less than the expected.",
reader.nextKeyValue());
key = reader.getCurrentKey();
val = reader.getCurrentValue();
long fileSize = key.get();
String file = new String(val.getBytes(), 0, val.getLength());
// Dist Cache files should be sorted based on file size.
assertEquals("Dist cache file size is wrong.",
sortedFileSizes[i], fileSize);
// Validate dist cache file path.
// parent dir of dist cache file
Path parent = new Path(file).getParent().makeQualified(fs);
// should exist in dist cache dir
assertTrue("Public dist cache file path is wrong.",
distCacheDir.equals(parent));
}
}
/**
* Test if DistributedCacheEmulator's setup of GenerateDistCacheData is
* working as expected.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testSetupGenerateDistCacheData()
throws IOException, InterruptedException {
long[] sortedFileSizes = new long[5];
JobConf jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes);
validateSetupGenDC(jobConf, sortedFileSizes);
// Verify if correct exit code is seen when -generate option is missing and
// distributed cache files are missing in the expected path.
runSetupGenerateDistCacheData(false, sortedFileSizes);
}
/**
* Create DistributedCacheEmulator object and do the initialization by
* calling init() on it with dummy trace. Also configure the pseudo local FS.
*/
private DistributedCacheEmulator createDistributedCacheEmulator(
Configuration conf, Path ioPath, boolean generate) throws IOException {
DistributedCacheEmulator dce =
new DistributedCacheEmulator(conf, ioPath);
JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
jobCreator.setDistCacheEmulator(dce);
dce.init("dummytrace", jobCreator, generate);
return dce;
}
/**
* Test the configuration property for disabling/enabling emulation of
* distributed cache load.
*/
@Test
public void testDistCacheEmulationConfigurability() throws IOException {
Configuration conf = new Configuration();
JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
new JobConf(conf));
Path ioPath = new Path("testDistCacheEmulationConfigurability")
.makeQualified(GridmixTestUtils.dfs);
FileSystem fs = FileSystem.get(jobConf);
FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
// default config
dce = createDistributedCacheEmulator(jobConf, ioPath, false);
assertTrue("Default configuration of "
+ DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ " is wrong.", dce.shouldEmulateDistCacheLoad());
// config property set to false
jobConf.setBoolean(
DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false);
dce = createDistributedCacheEmulator(jobConf, ioPath, false);
assertFalse("Disabling of emulation of distributed cache load by setting "
+ DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ " to false is not working.", dce.shouldEmulateDistCacheLoad());
}
/**
* Verify if DistributedCacheEmulator can configure distributed cache files
* for simulated job if job conf from trace had no dist cache files.
* @param conf configuration for the simulated job to be run
* @param jobConf job configuration of original cluster's job, obtained from
* trace
* @throws IOException
*/
private void validateJobConfWithOutDCFiles(Configuration conf,
JobConf jobConf) throws IOException {
// Validate if Gridmix can configure dist cache files properly if there are
// no HDFS-based dist cache files and localFS-based dist cache files in
// trace for a job.
dce.configureDistCacheFiles(conf, jobConf);
assertNull("Distributed cache files configured by GridMix is wrong.",
conf.get(MRJobConfig.CACHE_FILES));
assertNull("Distributed cache files configured by Gridmix through -files "
+ "option is wrong.", conf.get("tmpfiles"));
}
/**
* Verify if DistributedCacheEmulator can configure distributed cache files
* for simulated job if job conf from trace had HDFS-based dist cache files
* and local-FS-based dist cache files.
* <br>Also validate if Gridmix can handle/read deprecated config properties
* like mapred.cache.files.filesizes and mapred.cache.files.visibilities from
* trace file.
* @param conf configuration for the simulated job to be run
* @param jobConf job configuration of original cluster's job, obtained from
* trace
* @throws IOException
*/
private void validateJobConfWithDCFiles(Configuration conf,
JobConf jobConf) throws IOException {
long[] sortedFileSizes = configureDummyDistCacheFiles(jobConf, true);
// Validate if Gridmix can handle deprecated config properties like
// mapred.cache.files.filesizes and mapred.cache.files.visibilities.
// 1 local FS based dist cache file and 5 HDFS based dist cache files. So
// total expected dist cache files count is 6.
assertEquals("Gridmix is not able to extract dist cache file sizes.",
6, jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES).length);
assertEquals("Gridmix is not able to extract dist cache file visibilities.",
6, jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES).length);
dce.configureDistCacheFiles(conf, jobConf);
assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+ "wrong.", sortedFileSizes.length,
conf.getStrings(MRJobConfig.CACHE_FILES).length);
assertEquals("Configuring of local-FS-based dist cache files by gridmix is "
+ "wrong.", 1, conf.getStrings("tmpfiles").length);
}
/**
* Verify if configureDistCacheFiles() works fine when there are distributed
* cache files set but visibilities are not set. This is to handle history
* traces of older hadoop version where there are no private/public
* Distributed Caches.
* @throws IOException
*/
private void validateWithOutVisibilities() throws IOException {
Configuration conf = new Configuration();// configuration for simulated job
JobConf jobConf = new JobConf();
String user = "user1";
jobConf.setUser(user);
String[] files = {"/tmp/hdfs1.txt", "/tmp/"+ user + "/.staging/file1"};
jobConf.setStrings(MRJobConfig.CACHE_FILES, files);
jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "12,200");
jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "56789,98345");
dce.configureDistCacheFiles(conf, jobConf);
assertEquals("Configuring of HDFS-based dist cache files by gridmix is "
+ "wrong.", files.length,
conf.getStrings(MRJobConfig.CACHE_FILES).length);
assertNull("Configuring of local-FS-based dist cache files by gridmix is "
+ "wrong.", conf.get("tmpfiles"));
}
/**
* Test if Gridmix can configure config properties related to Distributed
* Cache properly. Also verify if Gridmix can handle deprecated config
* properties related to Distributed Cache.
* @throws IOException
*/
@Test
public void testDistCacheFilesConfiguration() throws IOException {
Configuration conf = new Configuration();
JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
new JobConf(conf));
Path ioPath = new Path("testDistCacheEmulationConfigurability")
.makeQualified(GridmixTestUtils.dfs);
FileSystem fs = FileSystem.get(jobConf);
FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
// default config
dce = createDistributedCacheEmulator(jobConf, ioPath, false);
assertTrue("Default configuration of "
+ DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+ " is wrong.", dce.shouldEmulateDistCacheLoad());
// Validate if DistributedCacheEmulator can handle a JobStory with out
// Distributed Cache files properly.
validateJobConfWithOutDCFiles(conf, jobConf);
// Validate if Gridmix can configure dist cache files properly if there are
// HDFS-based dist cache files and localFS-based dist cache files in trace
// for a job. Set old config properties and validate.
validateJobConfWithDCFiles(conf, jobConf);
// Use new JobConf as JobStory conf and check if configureDistCacheFiles()
// doesn't throw NPE when there are dist cache files set but visibilities
// are not set.
validateWithOutVisibilities();
}
}