System tests for HDFS and local FS based distributed cache emulation feature. Contributed by Vinay Kumar
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/yahoo-merge@1082633 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
index 5237437..87a91a1 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
@@ -366,7 +366,7 @@
* @return true if the path provided is of a local file system based
* distributed cache file
*/
- private boolean isLocalDistCacheFile(String filePath, String user,
+ static boolean isLocalDistCacheFile(String filePath, String user,
boolean visibility) {
return (!visibility && filePath.contains(user + "/.staging"));
}
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java
index 92d299f..4c9677b 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java
@@ -74,10 +74,11 @@
@AfterClass
public static void after() throws Exception {
UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
- org.apache.hadoop.fs.FileUtil.fullyDelete(new java.io.File("/tmp/gridmix-st"));
+ org.apache.hadoop.fs.FileUtil.fullyDelete(new java.io.File(System.
+ getProperty("java.io.tmpdir") + "/gridmix-st/"));
cluster.tearDown();
- if (gridmixJS.getJobConf().get("gridmix.user.resolve.class").
- contains("RoundRobin")) {
+ if (gridmixJS != null && gridmixJS.getJobConf().
+ get("gridmix.user.resolve.class"). contains("RoundRobin")) {
List<String> proxyUsers = UtilsForGridmix.
listProxyUsers(gridmixJS.getJobConf(),
UserGroupInformation.getLoginUser().getShortUserName());
@@ -97,19 +98,19 @@
public static void runGridmixAndVerify(String [] runtimeValues,
String [] otherValues, String tracePath, int mode) throws Exception {
- jobids = runGridmix(runtimeValues, otherValues, mode);
+ List<JobID> jobids = runGridmix(runtimeValues, otherValues, mode);
gridmixJV = new GridmixJobVerification(
new Path(tracePath), gridmixJS.getJobConf(), jtClient);
gridmixJV.verifyGridmixJobsWithJobStories(jobids);
}
-
+
public static List<JobID> runGridmix(String[] runtimeValues,
String[] otherValues, int mode) throws Exception {
gridmixJS = new GridmixJobSubmission(rtClient.getDaemonConf(),
jtClient, gridmixDir);
gridmixJS.submitJobs(runtimeValues, otherValues, mode);
- jobids = UtilsForGridmix.listGridmixJobIDs(jtClient.getClient(),
- gridmixJS.getGridmixJobCount());
+ List<JobID> jobids = UtilsForGridmix.listGridmixJobIDs(
+ jtClient.getClient(), gridmixJS.getGridmixJobCount());
return jobids;
}
@@ -126,4 +127,10 @@
}
return null;
}
+
+ public static boolean isLocalDistCache(String fileName, String userName,
+ boolean visibility) {
+ return DistributedCacheEmulator.isLocalDistCacheFile(fileName,
+ userName, visibility);
+ }
}
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java
index 5d39686..cddbf05 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java
@@ -177,9 +177,10 @@
dataSize + 0.1 > inputSize || dataSize - 0.1 < inputSize);
JobClient jobClient = jtClient.getClient();
+ int len = jobClient.getAllJobs().length;
LOG.info("Verify the job status after completion of job.");
Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED,
- jobClient.getAllJobs()[0].getRunState());
+ jobClient.getAllJobs()[len-1].getRunState());
}
private void verifyEachNodeSize(Path inputDir) throws IOException {
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
index 2942863..9ac260f 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
@@ -108,4 +108,29 @@
*/
public static final String GRIDMIX_DISTCACHE_ENABLE =
"gridmix.distributed-cache-emulation.enable";
+
+ /**
+ * Gridmix distributed cache visibilities.
+ */
+ public static final String GRIDMIX_DISTCACHE_VISIBILITIES =
+ "mapreduce.job.cache.files.visibilities";
+
+ /**
+ * Gridmix distributed cache files.
+ */
+ public static final String GRIDMIX_DISTCACHE_FILES =
+ "mapreduce.job.cache.files";
+
+ /**
+ * Gridmix distributed cache files size.
+ */
+ public static final String GRIDMIX_DISTCACHE_FILESSIZE =
+ "mapreduce.job.cache.files.filesizes";
+
+ /**
+ * Gridmix distributed cache files time stamp.
+ */
+ public static final String GRIDMIX_DISTCACHE_TIMESTAMP =
+ "mapreduce.job.cache.files.timestamps";
}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
index 77a9d30..97586fb 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
@@ -18,17 +18,25 @@
package org.apache.hadoop.mapred.gridmix.test.system;
import java.io.IOException;
+import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.Set;
+import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Counter;
@@ -45,6 +53,7 @@
import org.junit.Assert;
import java.text.ParseException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapred.gridmix.GridmixSystemTestCase;
/**
* Verifying each Gridmix job with corresponding job story in a trace file.
*/
@@ -54,6 +63,8 @@
private Path path;
private Configuration conf;
private JTClient jtClient;
+ private Map<String, List<JobConf>> simuAndOrigJobsInfo =
+ new HashMap<String, List<JobConf>>();
/**
* Gridmix job verification constructor
* @param path - path of the gridmix output directory.
@@ -76,11 +87,12 @@
public void verifyGridmixJobsWithJobStories(List<JobID> jobids)
throws IOException, ParseException {
- List<Long> origSubmissionTime = new ArrayList<Long>();
- List<Long> simuSubmissionTime = new ArrayList<Long>();
+ SortedMap <Long, String> origSubmissionTime = new TreeMap <Long, String>();
+ SortedMap <Long, String> simuSubmissionTime = new TreeMap<Long, String>();
GridmixJobStory gjs = new GridmixJobStory(path, conf);
final Iterator<JobID> ite = jobids.iterator();
- java.io.File destFolder = new java.io.File("/tmp/gridmix-st/");
+ File destFolder = new File(System.getProperty("java.io.tmpdir") +
+ "/gridmix-st/");
destFolder.mkdir();
while (ite.hasNext()) {
@@ -93,129 +105,491 @@
long expReduceInputRecs = 0;
long expReduceOutputRecs = 0;
- JobID currJobId = ite.next();
- String historyFilePath = jtClient.getProxy().
- getJobHistoryLocationForRetiredJob(currJobId);
- Path jhpath = new Path(historyFilePath);
- FileSystem fs = jhpath.getFileSystem(conf);
- JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
- JobHistoryParser.JobInfo jhInfo = jhparser.parse();
+ JobID simuJobId = ite.next();
+ JobHistoryParser.JobInfo jhInfo = getSimulatedJobHistory(simuJobId);
+ Assert.assertNotNull("Job history not found.", jhInfo);
Counters counters = jhInfo.getTotalCounters();
-
- fs.copyToLocalFile(jhpath,new Path(destFolder.toString()));
- fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"),
- new Path(destFolder.toString()));
- JobConf jobConf = new JobConf(conf);
- jobConf.addResource(new Path("/tmp/gridmix-st/" +
- currJobId + "_conf.xml"));
- String origJobId = jobConf.get("gridmix.job.original-job-id");
+ JobConf simuJobConf = getSimulatedJobConf(simuJobId,destFolder);
+ String origJobId = simuJobConf.get("gridmix.job.original-job-id");
LOG.info("OriginalJobID<->CurrentJobID:" +
- origJobId + "<->" + currJobId);
+ origJobId + "<->" + simuJobId);
ZombieJob zombieJob = gjs.getZombieJob(JobID.forName(origJobId));
- LoggedJob loggedJob = zombieJob.getLoggedJob();
-
- for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
- TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
- expMapInputBytes += mapTask.getInputBytes();
- expMapOutputBytes += mapTask.getOutputBytes();
- expMapInputRecs += mapTask.getInputRecords();
- expMapOutputRecs += mapTask.getOutputRecords();
+ Map<String, Long> mapJobCounters = getJobMapCounters(zombieJob);
+ Map<String, Long> reduceJobCounters = getJobReduceCounters(zombieJob);
+
+ if (simuJobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
+ origSubmissionTime.put(zombieJob.getSubmissionTime(),
+ origJobId.toString() + "^" + simuJobId);
+ simuSubmissionTime.put(jhInfo.getSubmitTime() ,
+ origJobId.toString() + "^" + simuJobId); ;
}
- for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
- TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
- expReduceInputBytes += reduceTask.getInputBytes();
- expReduceOutputBytes += reduceTask.getOutputBytes();
- expReduceInputRecs += reduceTask.getInputRecords();
- expReduceOutputRecs += reduceTask.getOutputRecords();
- }
+ LOG.info("Verifying the job <" + simuJobId + "> and wait for a while...");
+ verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf);
+ verifyJobMapCounters(counters,mapJobCounters,simuJobConf);
+ verifyJobReduceCounters(counters,reduceJobCounters,simuJobConf);
+ verifyDistributeCache(zombieJob,simuJobConf);
+ setJobDistributedCacheInfo(simuJobId.toString(), simuJobConf,
+ zombieJob.getJobConf());
+ LOG.info("Done.");
+ }
+ verifyDistributedCacheBetweenJobs(simuAndOrigJobsInfo);
+ }
- LOG.info("Verifying the job <" + currJobId + "> and wait for a while...");
- Assert.assertEquals("Job id has not matched",
- zombieJob.getJobID(), JobID.forName(origJobId));
+ /**
+ * Verify the distributed cache files between the jobs in a gridmix run.
+ * @param jobsInfo - jobConfs of simulated and original jobs as a map.
+ */
+ public void verifyDistributedCacheBetweenJobs(Map<String,
+ List<JobConf>> jobsInfo) {
+ if (jobsInfo.size() > 1) {
+ Map<String, Integer> simJobfilesOccurBtnJobs =
+ getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 0);
+ Map<String, Integer> origJobfilesOccurBtnJobs =
+ getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 1);
+ List<Integer> simuOccurList =
+ getMapValuesAsList(simJobfilesOccurBtnJobs);
+ Collections.sort(simuOccurList);
+ List<Integer> origOccurList =
+ getMapValuesAsList(origJobfilesOccurBtnJobs);
+ Collections.sort(origOccurList);
+ Assert.assertTrue("The unique count of distibuted cache files in " +
+ "simulated jobs have not matched with the unique count of " +
+ "original jobs distributed files ",
+ simuOccurList.size() == origOccurList.size());
+ int index = 0;
+ for(Integer origDistFileCount : origOccurList) {
+ Assert.assertTrue("Distributed cache file reused in simulated " +
+ "jobs has not matched with reused of distributed cache file " +
+ "in original jobs.",origDistFileCount == simuOccurList.get(index));
+ index ++;
+ }
+ }
+ }
+
+ private List<Integer> getMapValuesAsList(Map<String,Integer> jobOccurs) {
+ List<Integer> occursList = new ArrayList<Integer>();
+ Set<String> files = jobOccurs.keySet();
+ Iterator<String > ite = files.iterator();
+ while(ite.hasNext()) {
+ String file = ite.next();
+ occursList.add(jobOccurs.get(file));
+ }
+ return occursList;
+ }
+
- Assert.assertEquals("Job maps have not matched",
- zombieJob.getNumberMaps(),
- jhInfo.getTotalMaps());
+ /**
+ * Get the unique distributed cache files and occurrence between the jobs.
+ * @param jobsInfo - job's configurations as a map.
+ * @param jobConfIndex - 0 for simulated job configuration and
+ * 1 for original jobs configuration.
+ * @return - unique distributed cache files and occurrences as map.
+ */
+ private Map<String, Integer> getDistcacheFilesOccurenceBetweenJobs(
+ Map<String, List<JobConf>> jobsInfo, int jobConfIndex) {
+ Map<String,Integer> filesOccurBtnJobs = new HashMap <String,Integer>();
+ Set<String> jobIds = jobsInfo.keySet();
+ Iterator<String > ite = jobIds.iterator();
+ while(ite.hasNext()){
+ String jobId = ite.next();
+ List<JobConf> jobconfs = jobsInfo.get(jobId);
+ String [] distCacheFiles = jobconfs.get(jobConfIndex).
+ get(GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
+ String [] distCacheFileTimeStamps = jobconfs.get(jobConfIndex).
+ get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
+ String [] distCacheFileVisib = jobconfs.get(jobConfIndex).
+ get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
+ int indx = 0;
+ for (String distCacheFile : distCacheFiles) {
+ String fileAndSize = distCacheFile + "^" +
+ distCacheFileTimeStamps[indx] + "^" +
+ jobconfs.get(jobConfIndex).getUser();
+ if (filesOccurBtnJobs.get(fileAndSize)!= null) {
+ int count = filesOccurBtnJobs.get(fileAndSize);
+ count ++;
+ filesOccurBtnJobs.put(fileAndSize,count);
+ } else {
+ filesOccurBtnJobs.put(fileAndSize,1);
+ }
+ }
+ }
+ return filesOccurBtnJobs;
+ }
+
+ private void setJobDistributedCacheInfo(String jobId, JobConf simuJobConf,
+ JobConf origJobConf) {
+ if (simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES)!= null) {
+ List<JobConf> jobConfs = new ArrayList<JobConf>();
+ jobConfs.add(simuJobConf);
+ jobConfs.add(origJobConf);
+ simuAndOrigJobsInfo.put(jobId,jobConfs);
+ }
+ }
- if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
- Assert.assertEquals("Job reducers have not matched",
- zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
+ /**
+ * Verify the job subimssion order between the jobs in replay mode.
+ * @param origSubmissionTime - sorted map of original jobs submission times.
+ * @param simuSubmissionTime - sorted map of simulated jobs submission times.
+ */
+ public void verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime,
+ SortedMap<Long, String> simuSubmissionTime){
+ Assert.assertTrue("Simulated job's submission time count has " +
+ "not match with Original job's submission time count.",
+ origSubmissionTime.size() == simuSubmissionTime.size());
+ for ( int index = 0; index < origSubmissionTime.size(); index ++) {
+ String origAndSimuJobID = origSubmissionTime.get(index);
+ String simuAndorigJobID = simuSubmissionTime.get(index);
+ Assert.assertEquals("Simulated jobs have not submitted in same " +
+ "order as original jobs submitted inREPLAY mode.",
+ origAndSimuJobID, simuAndorigJobID);
+ }
+ }
+
+ /**
+ * It verifies the distributed cache emulation of a job.
+ * @param zombieJob - Original job story.
+ * @param simuJobConf - Simulated job configuration.
+ */
+ public void verifyDistributeCache(ZombieJob zombieJob,
+ JobConf simuJobConf) throws IOException {
+
+ if (simuJobConf.getBoolean(GridMixConfig.GRIDMIX_DISTCACHE_ENABLE, false)) {
+ JobConf origJobConf = zombieJob.getJobConf();
+ checkFileVisibility(simuJobConf);
+ checkDistcacheFiles(simuJobConf,origJobConf);
+ checkFileSizes(simuJobConf,origJobConf);
+ checkFileStamps(simuJobConf,origJobConf);
+ } else {
+ Assert.assertNull("Configuration has distributed cache visibilites" +
+ "without enabled distributed cache emulation.", simuJobConf.
+ get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES));
+ Assert.assertNull("Configuration has distributed cache files time " +
+ "stamps without enabled distributed cache emulation.",simuJobConf.
+ get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP));
+ Assert.assertNull("Configuration has distributed cache files paths" +
+ "without enabled distributed cache emulation.",simuJobConf.
+ get(GridMixConfig.GRIDMIX_DISTCACHE_FILES));
+ Assert.assertNull("Configuration has distributed cache files sizes" +
+ "without enabled distributed cache emulation.",simuJobConf.
+ get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE));
+ }
+ }
+
+ private void checkFileStamps(JobConf simuJobConf, JobConf origJobConf) {
+ //Verify simulated jobs against distributed cache files time stamps.
+ String [] origDCFTS = origJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
+ String [] simuDCFTS = simuJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
+ for (int index = 0; index < origDCFTS.length; index++) {
+ Assert.assertTrue("Invalid time stamps between original " +
+ "and simulated job",Long.parseLong(origDCFTS[index]) <
+ Long.parseLong(simuDCFTS[index]));
+ }
+ }
+
+ private void checkFileVisibility(JobConf simuJobConf ) {
+ // Verify simulated jobs against distributed cache files visibilities.
+ String [] distFiles = simuJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_FILES).split(",");
+ String [] simuDistVisibilities = simuJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
+ List<Boolean> expFileVisibility = new ArrayList<Boolean >();
+ int index = 0;
+ for (String distFile : distFiles) {
+ if (!GridmixSystemTestCase.isLocalDistCache(distFile,
+ simuJobConf.getUser(),Boolean.valueOf(simuDistVisibilities[index]))) {
+ expFileVisibility.add(true);
} else {
- Assert.assertEquals("Job reducers have not matched",
- 0, jhInfo.getTotalReduces());
+ expFileVisibility.add(false);
}
+ index ++;
+ }
+ index = 0;
+ for (String actFileVisibility : simuDistVisibilities) {
+ Assert.assertEquals("Simulated job distributed cache file " +
+ "visibilities has not matched.",
+ expFileVisibility.get(index),Boolean.valueOf(actFileVisibility));
+ index ++;
+ }
+ }
+
+ private void checkDistcacheFiles(JobConf simuJobConf, JobConf origJobConf)
+ throws IOException {
+ //Verify simulated jobs against distributed cache files.
+ String [] origDistFiles = origJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_FILES).split(",");
+ String [] simuDistFiles = simuJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_FILES).split(",");
+ String [] simuDistVisibilities = simuJobConf.get(GridMixConfig.
+ GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
+ Assert.assertEquals("No. of simulatued job's distcache files " +
+ "haven't matched with no.of original job's distcache files",
+ origDistFiles.length, simuDistFiles.length);
- Assert.assertEquals("Job status has not matched.",
- zombieJob.getOutcome().name(),
- convertJobStatus(jhInfo.getJobStatus()));
-
- Assert.assertEquals("Job priority has not matched.",
- loggedJob.getPriority().toString(), jhInfo.getPriority());
-
- if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
- Assert.assertTrue(currJobId + "has not impersonate with other user.",
- !jhInfo.getUsername().equals(UserGroupInformation.
- getLoginUser().getShortUserName()));
+ int index = 0;
+ for (String simDistFile : simuDistFiles) {
+ Path distPath = new Path(simDistFile);
+ if (!GridmixSystemTestCase.isLocalDistCache(simDistFile,
+ simuJobConf.getUser(),Boolean.valueOf(simuDistVisibilities[index]))) {
+ FileSystem fs = distPath.getFileSystem(conf);
+ FileStatus fstat = fs.getFileStatus(distPath);
+ FsPermission permission = fstat.getPermission();
+ Assert.assertTrue("HDFS distributed cache file has wrong " +
+ "permissions for users.", FsAction.READ_WRITE.SYMBOL ==
+ permission.getUserAction().SYMBOL);
+ Assert.assertTrue("HDFS distributed cache file has wrong " +
+ "permissions for groups.",FsAction.READ.SYMBOL ==
+ permission.getGroupAction().SYMBOL);
+ Assert.assertTrue("HDSFS distributed cache file has wrong " +
+ "permissions for others.",FsAction.READ.SYMBOL ==
+ permission.getOtherAction().SYMBOL);
}
+ }
+ index ++;
+ }
- if (jobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
- origSubmissionTime.add(zombieJob.getSubmissionTime());
- simuSubmissionTime.add(jhInfo.getSubmitTime());
- }
+ private void checkFileSizes(JobConf simuJobConf, JobConf origJobConf) {
+ // Verify simulated jobs against distributed cache files size.
+ List<String> origDistFilesSize = Arrays.asList(origJobConf.
+ get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
+ Collections.sort(origDistFilesSize);
+ List<String> simuDistFilesSize = Arrays.asList(simuJobConf.
+ get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
+ Collections.sort(simuDistFilesSize);
+ Assert.assertEquals("Simulated job's file size list has not " +
+ "matched with the Original job's file size list.",
+ origDistFilesSize.size(),
+ simuDistFilesSize.size());
+ for ( int index = 0; index < origDistFilesSize.size(); index ++ ) {
+ Assert.assertEquals("Simulated job distcache file size has not " +
+ "matched with original job distcache file size.",
+ origDistFilesSize.get(index), simuDistFilesSize.get(index));
+ }
+ }
- if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
-
- //The below statements have commented due to a bug(MAPREDUCE-2135).
- /* Assert.assertTrue("Map input bytes have not matched.<exp:[" +
- convertBytes(expMapInputBytes) +"]><act:[" +
- convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>",
- convertBytes(expMapInputBytes).equals(
- convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
+ /**
+ * It verifies the simulated job map counters.
+ * @param counters - Original job map counters.
+ * @param mapJobCounters - Simulated job map counters.
+ * @param jobConf - Simulated job configuration.
+ * @throws ParseException - If an parser error occurs.
+ */
+ public void verifyJobMapCounters(Counters counters,
+ Map<String,Long> mapCounters, JobConf jobConf) throws ParseException {
+ if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
+ //The below statements have commented due to a bug(MAPREDUCE-2135).
+ /*Assert.assertTrue("Map input bytes have not matched.<exp:[" +
+ convertBytes(mapCounters.get("MAP_INPUT_BYTES").longValue()) +"]><act:[" +
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>",
+ convertBytes(mapCounters.get("MAP_INPUT_BYTES").longValue()).equals(
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
- Assert.assertTrue("Map output bytes has not matched.<exp:[" +
- convertBytes(expMapOutputBytes) + "]><act:[" +
- convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>",
- convertBytes(expMapOutputBytes).equals(
- convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
+ Assert.assertTrue("Map output bytes has not matched.<exp:[" +
+ convertBytes(mapCounters.get("MAP_OUTPUT_BYTES").longValue()) + "]><act:[" +
+ convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>",
+ convertBytes(mapCounters.get("MAP_OUTPUT_BYTES").longValue()).equals(
+ convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
- Assert.assertEquals("Map input records have not matched.<exp:[" +
- expMapInputRecs + "]><act:[" +
- getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>",
- expMapInputRecs, getCounterValue(counters, "MAP_INPUT_RECORDS"));
+ Assert.assertEquals("Map input records have not matched.<exp:[" +
+ mapCounters.get("MAP_INPUT_RECS").longValue() + "]><act:[" +
+ getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>",
+ mapCounters.get("MAP_INPUT_RECS").longValue(),
+ getCounterValue(counters, "MAP_INPUT_RECORDS"));
- // The below statements have commented due to a bug(MAPREDUCE-2154).
- /*Assert.assertEquals("Map output records have not matched.<exp:[" +
- expMapOutputRecs + "]><act:[" +
- getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>",
- expMapOutputRecs, getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
+ // The below statements have commented due to a bug(MAPREDUCE-2154).
+ /*Assert.assertEquals("Map output records have not matched.<exp:[" +
+ mapCounters.get("MAP_OUTPUT_RECS").longValue() + "]><act:[" +
+ getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>",
+ mapCounters.get("MAP_OUTPUT_RECS").longValue(),
+ getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
+ } else {
+ Assert.assertTrue("Map Input Bytes are zero",
+ getCounterValue(counters,"HDFS_BYTES_READ") != 0);
+ Assert.assertNotNull("Map Input Records are zero",
+ getCounterValue(counters, "MAP_INPUT_RECORDS")!=0);
+ }
+ }
- /*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
- convertBytes(expReduceInputBytes) + "]><act:[" +
- convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>",
- convertBytes(expReduceInputBytes).equals(
- convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/
+ /**
+ * It verifies the simulated job reduce counters.
+ * @param counters - Original job reduce counters.
+ * @param reduceCounters - Simulated job reduce counters.
+ * @param jobConf - simulated job configuration.
+ * @throws ParseException - if an parser error occurs.
+ */
+ public void verifyJobReduceCounters(Counters counters,
+ Map<String,Long> reduceCounters, JobConf jobConf) throws ParseException {
+ if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
+ /*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
+ convertBytes(reduceCounters.get("REDUCE_INPUT_BYTES").longValue()) + "]><act:[" +
+ convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>",
+ convertBytes(reduceCounters.get("REDUCE_INPUT_BYTES").longValue()).equals(
+ convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/
- /*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" +
- convertBytes(expReduceOutputBytes) + "]><act:[" +
- convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>",
- convertBytes(expReduceOutputBytes).equals(
- convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
+ /*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" +
+ convertBytes(reduceCounters.get("REDUCE_OUTPUT_BYTES").longValue()) + "]><act:[" +
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>",
+ convertBytes(reduceCounters.get("REDUCE_OUTPUT_BYTES").longValue()).equals(
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
- /*Assert.assertEquals("Reduce output records have not matched.<exp:[" +
- expReduceOutputRecs + "]><act:[" + getCounterValue(counters,
- "REDUCE_OUTPUT_RECORDS") + "]>",
- expReduceOutputRecs, getCounterValue(counters,
- "REDUCE_OUTPUT_RECORDS"));*/
-
- /*Assert.assertEquals("Reduce input records have not matched.<exp:[" +
- expReduceInputRecs + "]><act:[" + getCounterValue(counters,
- "REDUCE_INPUT_RECORDS") + "]>",
- expReduceInputRecs,
- getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
- LOG.info("Done.");
- }
+ /*Assert.assertEquals("Reduce output records have not matched.<exp:[" +
+ reduceCounters.get("REDUCE_OUTPUT_RECS").longValue() + "]><act:[" + getCounterValue(counters,
+ "REDUCE_OUTPUT_RECORDS") + "]>",
+ reduceCounters.get("REDUCE_OUTPUT_RECS").longValue(), getCounterValue(counters,
+ "REDUCE_OUTPUT_RECORDS"));*/
+
+ /*Assert.assertEquals("Reduce input records have not matched.<exp:[" +
+ reduceCounters.get("REDUCE_INPUT_RECS").longValue() + "]><act:[" + getCounterValue(counters,
+ "REDUCE_INPUT_RECORDS") + "]>",
+ reduceCounters.get("REDUCE_INPUT_RECS").longValue(),
+ getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
+ } else {
+ Assert.assertTrue("Reduce output records are not zero for sleep job.",
+ getCounterValue(counters, "REDUCE_OUTPUT_RECORDS") == 0);
+ Assert.assertTrue("Reduce output bytes are not zero for sleep job.",
+ getCounterValue(counters,"HDFS_BYTES_WRITTEN") == 0);
+ }
+ }
+
+ /**
+ * It verifies the gridmix simulated job summary.
+ * @param zombieJob - Original job summary.
+ * @param jhInfo - Simulated job history info.
+ * @param jobConf - simulated job configuration.
+ * @throws IOException - if an I/O error occurs.
+ */
+ public void verifySimulatedJobSummary(ZombieJob zombieJob,
+ JobHistoryParser.JobInfo jhInfo, JobConf jobConf) throws IOException {
+ Assert.assertEquals("Job id has not matched",
+ zombieJob.getJobID(), JobID.forName(
+ jobConf.get("gridmix.job.original-job-id")));
+
+ Assert.assertEquals("Job maps have not matched",
+ zombieJob.getNumberMaps(), jhInfo.getTotalMaps());
+
+ if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
+ Assert.assertEquals("Job reducers have not matched",
+ zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
+ } else {
+ Assert.assertEquals("Job reducers have not matched",
+ 0, jhInfo.getTotalReduces());
+ }
+
+ Assert.assertEquals("Job status has not matched.",
+ zombieJob.getOutcome().name(),
+ convertJobStatus(jhInfo.getJobStatus()));
+
+ LoggedJob loggedJob = zombieJob.getLoggedJob();
+ Assert.assertEquals("Job priority has not matched.",
+ loggedJob.getPriority().toString(), jhInfo.getPriority());
+
+ if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
+ Assert.assertTrue(jhInfo.getJobId().toString() +
+ " has not impersonate with other user.", !jhInfo.getUsername()
+ .equals(UserGroupInformation.getLoginUser().getShortUserName()));
+ }
+ }
+
+ /**
+ * Get the original job map counters from a trace.
+ * @param zombieJob - Original job story.
+ * @return - map counters as a map.
+ */
+ public Map<String, Long> getJobMapCounters(ZombieJob zombieJob) {
+ long expMapInputBytes = 0;
+ long expMapOutputBytes = 0;
+ long expMapInputRecs = 0;
+ long expMapOutputRecs = 0;
+ Map<String,Long> mapCounters = new HashMap<String,Long>();
+ for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
+ TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
+ expMapInputBytes += mapTask.getInputBytes();
+ expMapOutputBytes += mapTask.getOutputBytes();
+ expMapInputRecs += mapTask.getInputRecords();
+ expMapOutputRecs += mapTask.getOutputRecords();
+ }
+ mapCounters.put("MAP_INPUT_BYTES", expMapInputBytes);
+ mapCounters.put("MAP_OUTPUT_BYTES", expMapOutputBytes);
+ mapCounters.put("MAP_INPUT_RECS", expMapInputRecs);
+ mapCounters.put("MAP_OUTPUT_RECS", expMapOutputRecs);
+ return mapCounters;
+ }
+
+ /**
+ * Get the original job reduce counters from a trace.
+ * @param zombieJob - Original job story.
+ * @return - reduce counters as a map.
+ */
+ public Map<String,Long> getJobReduceCounters(ZombieJob zombieJob) {
+ long expReduceInputBytes = 0;
+ long expReduceOutputBytes = 0;
+ long expReduceInputRecs = 0;
+ long expReduceOutputRecs = 0;
+ Map<String,Long> reduceCounters = new HashMap<String,Long>();
+ for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
+ TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
+ expReduceInputBytes += reduceTask.getInputBytes();
+ expReduceOutputBytes += reduceTask.getOutputBytes();
+ expReduceInputRecs += reduceTask.getInputRecords();
+ expReduceOutputRecs += reduceTask.getOutputRecords();
+ }
+ reduceCounters.put("REDUCE_INPUT_BYTES", expReduceInputBytes);
+ reduceCounters.put("REDUCE_OUTPUT_BYTES", expReduceOutputBytes);
+ reduceCounters.put("REDUCE_INPUT_RECS", expReduceInputRecs);
+ reduceCounters.put("REDUCE_OUTPUT_RECS", expReduceOutputRecs);
+ return reduceCounters;
+ }
+
+ /**
+ * Get the simulated job configuration of a job.
+ * @param simulatedJobID - Simulated job id.
+ * @param tmpJHFolder - temporary job history folder location.
+ * @return - simulated job configuration.
+ * @throws IOException - If an I/O error occurs.
+ */
+ public JobConf getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder)
+ throws IOException{
+ FileSystem fs = null;
+ try {
+ String historyFilePath = jtClient.getProxy().
+ getJobHistoryLocationForRetiredJob(simulatedJobID);
+ Path jhpath = new Path(historyFilePath);
+ fs = jhpath.getFileSystem(conf);
+ fs.copyToLocalFile(jhpath,new Path(tmpJHFolder.toString()));
+ fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"),
+ new Path(tmpJHFolder.toString()));
+ JobConf jobConf = new JobConf();
+ jobConf.addResource(new Path(tmpJHFolder.toString() + "/" +
+ simulatedJobID + "_conf.xml"));
+ jobConf.reloadConfiguration();
+ return jobConf;
+ }finally {
+ fs.close();
+ }
+ }
+
+ /**
+ * Get the simulated job history of a job.
+ * @param simulatedJobID - simulated job id.
+ * @return - simulated job information.
+ * @throws IOException - if an I/O error occurs.
+ */
+ public JobHistoryParser.JobInfo getSimulatedJobHistory(JobID simulatedJobID)
+ throws IOException {
+ FileSystem fs = null;
+ try {
+ String historyFilePath = jtClient.getProxy().
+ getJobHistoryLocationForRetiredJob(simulatedJobID);
+ Path jhpath = new Path(historyFilePath);
+ fs = jhpath.getFileSystem(conf);
+ JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
+ JobHistoryParser.JobInfo jhInfo = jhparser.parse();
+ return jhInfo;
+ } finally {
+ fs.close();
}
}
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java
index c323cd2..97696df 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java
@@ -266,7 +266,8 @@
JobStatus js = jobStatus[numJobs - index];
JobID jobid = js.getJobID();
String jobName = js.getJobName();
- if (!jobName.equals("GRIDMIX_GENERATE_INPUT_DATA")) {
+ if (!jobName.equals("GRIDMIX_GENERATE_INPUT_DATA") &&
+ !jobName.equals("GRIDMIX_GENERATE_DISTCACHE_DATA")) {
jobids.add(jobid);
}
}