blob: 77a9d302abbce6f007afdecede030c6a4c78fb4f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix.test.system;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.test.system.JTClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.ZombieJob;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.junit.Assert;
import java.text.ParseException;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Verifying each Gridmix job with corresponding job story in a trace file.
*/
public class GridmixJobVerification {
private static Log LOG = LogFactory.getLog(GridmixJobVerification.class);
private Path path;
private Configuration conf;
private JTClient jtClient;
/**
* Gridmix job verification constructor
* @param path - path of the gridmix output directory.
* @param conf - cluster configuration.
* @param jtClient - jobtracker client.
*/
public GridmixJobVerification(Path path, Configuration conf,
JTClient jtClient) {
this.path = path;
this.conf = conf;
this.jtClient = jtClient;
}
/**
* It verifies the Gridmix jobs with corresponding job story in a trace file.
* @param jobids - gridmix job ids.
* @throws IOException - if an I/O error occurs.
* @throws ParseException - if an parse error occurs.
*/
public void verifyGridmixJobsWithJobStories(List<JobID> jobids)
throws IOException, ParseException {
List<Long> origSubmissionTime = new ArrayList<Long>();
List<Long> simuSubmissionTime = new ArrayList<Long>();
GridmixJobStory gjs = new GridmixJobStory(path, conf);
final Iterator<JobID> ite = jobids.iterator();
java.io.File destFolder = new java.io.File("/tmp/gridmix-st/");
destFolder.mkdir();
while (ite.hasNext()) {
long expMapInputBytes = 0;
long expMapOutputBytes = 0;
long expMapInputRecs = 0;
long expMapOutputRecs = 0;
long expReduceInputBytes = 0;
long expReduceOutputBytes = 0;
long expReduceInputRecs = 0;
long expReduceOutputRecs = 0;
JobID currJobId = ite.next();
String historyFilePath = jtClient.getProxy().
getJobHistoryLocationForRetiredJob(currJobId);
Path jhpath = new Path(historyFilePath);
FileSystem fs = jhpath.getFileSystem(conf);
JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
JobHistoryParser.JobInfo jhInfo = jhparser.parse();
Counters counters = jhInfo.getTotalCounters();
fs.copyToLocalFile(jhpath,new Path(destFolder.toString()));
fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"),
new Path(destFolder.toString()));
JobConf jobConf = new JobConf(conf);
jobConf.addResource(new Path("/tmp/gridmix-st/" +
currJobId + "_conf.xml"));
String origJobId = jobConf.get("gridmix.job.original-job-id");
LOG.info("OriginalJobID<->CurrentJobID:" +
origJobId + "<->" + currJobId);
ZombieJob zombieJob = gjs.getZombieJob(JobID.forName(origJobId));
LoggedJob loggedJob = zombieJob.getLoggedJob();
for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
expMapInputBytes += mapTask.getInputBytes();
expMapOutputBytes += mapTask.getOutputBytes();
expMapInputRecs += mapTask.getInputRecords();
expMapOutputRecs += mapTask.getOutputRecords();
}
for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
expReduceInputBytes += reduceTask.getInputBytes();
expReduceOutputBytes += reduceTask.getOutputBytes();
expReduceInputRecs += reduceTask.getInputRecords();
expReduceOutputRecs += reduceTask.getOutputRecords();
}
LOG.info("Verifying the job <" + currJobId + "> and wait for a while...");
Assert.assertEquals("Job id has not matched",
zombieJob.getJobID(), JobID.forName(origJobId));
Assert.assertEquals("Job maps have not matched",
zombieJob.getNumberMaps(),
jhInfo.getTotalMaps());
if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
Assert.assertEquals("Job reducers have not matched",
zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
} else {
Assert.assertEquals("Job reducers have not matched",
0, jhInfo.getTotalReduces());
}
Assert.assertEquals("Job status has not matched.",
zombieJob.getOutcome().name(),
convertJobStatus(jhInfo.getJobStatus()));
Assert.assertEquals("Job priority has not matched.",
loggedJob.getPriority().toString(), jhInfo.getPriority());
if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
Assert.assertTrue(currJobId + "has not impersonate with other user.",
!jhInfo.getUsername().equals(UserGroupInformation.
getLoginUser().getShortUserName()));
}
if (jobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
origSubmissionTime.add(zombieJob.getSubmissionTime());
simuSubmissionTime.add(jhInfo.getSubmitTime());
}
if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
//The below statements have commented due to a bug(MAPREDUCE-2135).
/* Assert.assertTrue("Map input bytes have not matched.<exp:[" +
convertBytes(expMapInputBytes) +"]><act:[" +
convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>",
convertBytes(expMapInputBytes).equals(
convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
Assert.assertTrue("Map output bytes has not matched.<exp:[" +
convertBytes(expMapOutputBytes) + "]><act:[" +
convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>",
convertBytes(expMapOutputBytes).equals(
convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
Assert.assertEquals("Map input records have not matched.<exp:[" +
expMapInputRecs + "]><act:[" +
getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>",
expMapInputRecs, getCounterValue(counters, "MAP_INPUT_RECORDS"));
// The below statements have commented due to a bug(MAPREDUCE-2154).
/*Assert.assertEquals("Map output records have not matched.<exp:[" +
expMapOutputRecs + "]><act:[" +
getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>",
expMapOutputRecs, getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
/*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
convertBytes(expReduceInputBytes) + "]><act:[" +
convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>",
convertBytes(expReduceInputBytes).equals(
convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/
/*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" +
convertBytes(expReduceOutputBytes) + "]><act:[" +
convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>",
convertBytes(expReduceOutputBytes).equals(
convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
/*Assert.assertEquals("Reduce output records have not matched.<exp:[" +
expReduceOutputRecs + "]><act:[" + getCounterValue(counters,
"REDUCE_OUTPUT_RECORDS") + "]>",
expReduceOutputRecs, getCounterValue(counters,
"REDUCE_OUTPUT_RECORDS"));*/
/*Assert.assertEquals("Reduce input records have not matched.<exp:[" +
expReduceInputRecs + "]><act:[" + getCounterValue(counters,
"REDUCE_INPUT_RECORDS") + "]>",
expReduceInputRecs,
getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
LOG.info("Done.");
}
}
}
private String convertJobStatus(String jobStatus) {
if (jobStatus.equals("SUCCEEDED")) {
return "SUCCESS";
} else {
return jobStatus;
}
}
private String convertBytes(long bytesValue) {
int units = 1024;
if( bytesValue < units ) {
return String.valueOf(bytesValue)+ "B";
} else {
int exp = (int)(Math.log(bytesValue) / Math.log(units));
return String.format("%1d%sB",(long)(bytesValue / Math.pow(units, exp)),
"KMGTPE".charAt(exp -1));
}
}
private long getCounterValue(Counters counters,String key)
throws ParseException {
for (String groupName : counters.getGroupNames()) {
CounterGroup totalGroup = counters.getGroup(groupName);
Iterator<Counter> itrCounter = totalGroup.iterator();
while (itrCounter.hasNext()) {
Counter counter = itrCounter.next();
if (counter.getName().equals(key)) {
return counter.getValue();
}
}
}
return 0;
}
}