blob: 2815f248e3db00eb53f003dd3e8c5c7c3eccd2d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.InputStream;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.log4j.Level;
import static org.apache.hadoop.mapreduce.TaskCounter.MAP_INPUT_RECORDS;
import static org.apache.hadoop.mapreduce.TaskCounter.MAP_OUTPUT_BYTES;
import static org.apache.hadoop.mapreduce.TaskCounter.MAP_OUTPUT_RECORDS;
import static org.apache.hadoop.mapreduce.TaskCounter.REDUCE_INPUT_RECORDS;
import static org.apache.hadoop.mapreduce.TaskCounter.REDUCE_OUTPUT_RECORDS;
import static org.apache.hadoop.mapreduce.TaskCounter.REDUCE_SHUFFLE_BYTES;
import static org.apache.hadoop.mapreduce.TaskCounter.SPLIT_RAW_BYTES;
public class TestGridmixSubmission {
static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
public static final Log LOG = LogFactory.getLog(Gridmix.class);
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
).getLogger().setLevel(Level.DEBUG);
}
private static final int NJOBS = 3;
private static final long GENDATA = 30; // in megabytes
private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
@BeforeClass
public static void init() throws IOException {
GridmixTestUtils.initCluster();
}
@AfterClass
public static void shutDown() throws IOException {
GridmixTestUtils.shutdownCluster();
}
static class TestMonitor extends JobMonitor {
static final long SLOPBYTES = 1024;
private final int expected;
private final BlockingQueue<Job> retiredJobs;
public TestMonitor(int expected, Statistics stats) {
super(stats);
this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>();
}
public void verify(ArrayList<JobStory> submitted) throws Exception {
final ArrayList<Job> succeeded = new ArrayList<Job>();
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
for (JobStory spec : submitted) {
sub.put(spec.getJobID().toString(), spec);
}
final JobClient client = new JobClient(
GridmixTestUtils.mrCluster.createJobConf());
for (Job job : succeeded) {
final String jobName = job.getJobName();
Configuration conf = job.getConfiguration();
if (GenerateData.JOB_NAME.equals(jobName)) {
verifyQueue(conf, jobName);
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
assertTrue("Mismatched data gen", // +/- 100k for logs
(GENDATA << 20) < generated.getLength() + GENSLOP ||
(GENDATA << 20) > generated.getLength() - GENSLOP);
FileStatus[] outstat = GridmixTestUtils.dfs.listStatus(out);
assertEquals("Mismatched job count", NJOBS, outstat.length);
continue;
} else if (GenerateDistCacheData.JOB_NAME.equals(jobName)) {
verifyQueue(conf, jobName);
continue;
}
if (!conf.getBoolean(
GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
assertEquals(" Improper queue for " + jobName + " " ,
conf.get(MRJobConfig.QUEUE_NAME), "q1" );
} else {
assertEquals(" Improper queue for " + jobName + " ",
conf.get(MRJobConfig.QUEUE_NAME),
sub.get(conf.get(Gridmix.ORIGINAL_JOB_ID)).getQueueName());
}
final String originalJobId = conf.get(Gridmix.ORIGINAL_JOB_ID);
final JobStory spec = sub.get(originalJobId);
assertNotNull("No spec for " + jobName, spec);
assertNotNull("No counters for " + jobName, job.getCounters());
final String originalJobName = spec.getName();
System.out.println("originalJobName=" + originalJobName
+ ";GridmixJobName=" + jobName + ";originalJobID=" + originalJobId);
assertTrue("Original job name is wrong.", originalJobName.equals(
conf.get(Gridmix.ORIGINAL_JOB_NAME)));
// Gridmix job seqNum contains 6 digits
int seqNumLength = 6;
String jobSeqNum = new DecimalFormat("000000").format(
conf.getInt(GridmixJob.GRIDMIX_JOB_SEQ, -1));
// Original job name is of the format MOCKJOB<6 digit sequence number>
// because MockJob jobNames are of this format.
assertTrue(originalJobName.substring(
originalJobName.length() - seqNumLength).equals(jobSeqNum));
assertTrue("Gridmix job name is not in the expected format.",
jobName.equals(
GridmixJob.JOB_NAME_PREFIX + jobSeqNum));
final FileStatus stat =
GridmixTestUtils.dfs.getFileStatus(
new Path(GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum)));
assertEquals("Wrong owner for " + jobName, spec.getUser(),
stat.getOwner());
final int nMaps = spec.getNumberMaps();
final int nReds = spec.getNumberReduces();
// TODO Blocked by MAPREDUCE-118
if (true) return;
// TODO
System.out.println(jobName + ": " + nMaps + "/" + nReds);
final TaskReport[] mReports =
client.getMapTaskReports(JobID.downgrade(job.getJobID()));
assertEquals("Mismatched map count", nMaps, mReports.length);
check(TaskType.MAP, job, spec, mReports,
0, 0, SLOPBYTES, nReds);
final TaskReport[] rReports =
client.getReduceTaskReports(JobID.downgrade(job.getJobID()));
assertEquals("Mismatched reduce count", nReds, rReports.length);
check(TaskType.REDUCE, job, spec, rReports,
nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
}
}
// Verify if correct job queue is used
private void verifyQueue(Configuration conf, String jobName) {
if (!conf.getBoolean(
GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
assertEquals(" Improper queue for " + jobName,
conf.get("mapred.job.queue.name"), "q1");
} else {
assertEquals(" Improper queue for " + jobName,
conf.get("mapred.job.queue.name"), "default");
}
}
public void check(final TaskType type, Job job, JobStory spec,
final TaskReport[] runTasks,
long extraInputBytes, int extraInputRecords,
long extraOutputBytes, int extraOutputRecords) throws Exception {
long[] runInputRecords = new long[runTasks.length];
long[] runInputBytes = new long[runTasks.length];
long[] runOutputRecords = new long[runTasks.length];
long[] runOutputBytes = new long[runTasks.length];
long[] specInputRecords = new long[runTasks.length];
long[] specInputBytes = new long[runTasks.length];
long[] specOutputRecords = new long[runTasks.length];
long[] specOutputBytes = new long[runTasks.length];
for (int i = 0; i < runTasks.length; ++i) {
final TaskInfo specInfo;
final Counters counters = runTasks[i].getCounters();
switch (type) {
case MAP:
runInputBytes[i] = counters.findCounter("FileSystemCounters",
"HDFS_BYTES_READ").getValue() -
counters.findCounter(SPLIT_RAW_BYTES).getValue();
runInputRecords[i] =
(int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
runOutputBytes[i] =
counters.findCounter(MAP_OUTPUT_BYTES).getValue();
runOutputRecords[i] =
(int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.MAP, i);
specInputRecords[i] = specInfo.getInputRecords();
specInputBytes[i] = specInfo.getInputBytes();
specOutputRecords[i] = specInfo.getOutputRecords();
specOutputBytes[i] = specInfo.getOutputBytes();
System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
specInputBytes[i], specOutputBytes[i],
specInputRecords[i], specOutputRecords[i]);
System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
runInputBytes[i], runOutputBytes[i],
runInputRecords[i], runOutputRecords[i]);
break;
case REDUCE:
runInputBytes[i] = 0;
runInputRecords[i] =
(int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
runOutputBytes[i] =
counters.findCounter("FileSystemCounters",
"HDFS_BYTES_WRITTEN").getValue();
runOutputRecords[i] =
(int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
// There is no reliable counter for reduce input bytes. The
// variable-length encoding of intermediate records and other noise
// make this quantity difficult to estimate. The shuffle and spec
// input bytes are included in debug output for reference, but are
// not checked
specInputBytes[i] = 0;
specInputRecords[i] = specInfo.getInputRecords();
specOutputRecords[i] = specInfo.getOutputRecords();
specOutputBytes[i] = specInfo.getOutputBytes();
System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
specInfo.getInputBytes(), specOutputBytes[i],
specInputRecords[i], specOutputRecords[i]);
System.out.printf(type + " RUN: (%9d) -> %9d :: %5d -> %5d\n",
counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
break;
default:
specInfo = null;
fail("Unexpected type: " + type);
}
}
// Check input bytes
Arrays.sort(specInputBytes);
Arrays.sort(runInputBytes);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " input bytes " +
specInputBytes[i] + "/" + runInputBytes[i],
eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
}
// Check input records
Arrays.sort(specInputRecords);
Arrays.sort(runInputRecords);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " input records " +
specInputRecords[i] + "/" + runInputRecords[i],
eqPlusMinus(runInputRecords[i], specInputRecords[i],
extraInputRecords));
}
// Check output bytes
Arrays.sort(specOutputBytes);
Arrays.sort(runOutputBytes);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " output bytes " +
specOutputBytes[i] + "/" + runOutputBytes[i],
eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
extraOutputBytes));
}
// Check output records
Arrays.sort(specOutputRecords);
Arrays.sort(runOutputRecords);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " output records " +
specOutputRecords[i] + "/" + runOutputRecords[i],
eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
extraOutputRecords));
}
}
private static boolean eqPlusMinus(long a, long b, long x) {
final long diff = Math.abs(a - b);
return diff <= x;
}
@Override
protected void onSuccess(Job job) {
retiredJobs.add(job);
}
@Override
protected void onFailure(Job job) {
fail("Job failure: " + job);
}
}
static class DebugGridmix extends Gridmix {
private JobFactory factory;
private TestMonitor monitor;
public void checkMonitor() throws Exception {
monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted());
}
@Override
protected JobMonitor createJobMonitor(Statistics stats) {
monitor = new TestMonitor(NJOBS + 1, stats);
return monitor;
}
@Override
protected JobFactory createJobFactory(JobSubmitter submitter,
String traceIn, Path scratchDir, Configuration conf,
CountDownLatch startFlag, UserResolver userResolver)
throws IOException {
factory = DebugJobFactory.getFactory(
submitter, scratchDir, NJOBS, conf, startFlag, userResolver);
return factory;
}
}
/**
* Verifies that the given {@code JobStory} corresponds to the checked-in
* WordCount {@code JobStory}. The verification is effected via JUnit
* assertions.
*
* @param js the candidate JobStory.
*/
private void verifyWordCountJobStory(JobStory js) {
assertNotNull("Null JobStory", js);
String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+ js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+ js.getNumberMaps() + ":" + js.getNumberReduces();
assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
}
/**
* Expands a file compressed using {@code gzip}.
*
* @param fs the {@code FileSystem} corresponding to the given
* file.
*
* @param in the path to the compressed file.
*
* @param out the path to the uncompressed output.
*
* @throws Exception if there was an error during the operation.
*/
private void expandGzippedTrace(FileSystem fs, Path in, Path out)
throws Exception {
byte[] buff = new byte[4096];
GZIPInputStream gis = new GZIPInputStream(fs.open(in));
FSDataOutputStream fsdos = fs.create(out);
int numRead;
while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
fsdos.write(buff, 0, numRead);
}
gis.close();
fsdos.close();
}
/**
* Tests the reading of traces in GridMix3. These traces are generated
* by Rumen and are in the JSON format. The traces can optionally be
* compressed and uncompressed traces can also be passed to GridMix3 via
* its standard input stream. The testing is effected via JUnit assertions.
*
* @throws Exception if there was an error.
*/
@Test
public void testTraceReader() throws Exception {
Configuration conf = new Configuration();
FileSystem lfs = FileSystem.getLocal(conf);
Path rootInputDir = new Path(System.getProperty("src.test.data"));
rootInputDir
= rootInputDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
Path rootTempDir
= new Path(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")), "testTraceReader");
rootTempDir
= rootTempDir.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
InputStream origStdIn = System.in;
InputStream tmpIs = null;
try {
DebugGridmix dgm = new DebugGridmix();
JobStoryProducer jsp
= dgm.createJobStoryProducer(inputFile.toString(), conf);
System.out.println("Verifying JobStory from compressed trace...");
verifyWordCountJobStory(jsp.getNextJob());
expandGzippedTrace(lfs, inputFile, tempFile);
jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
System.out.println("Verifying JobStory from uncompressed trace...");
verifyWordCountJobStory(jsp.getNextJob());
tmpIs = lfs.open(tempFile);
System.setIn(tmpIs);
System.out.println("Verifying JobStory from trace in standard input...");
jsp = dgm.createJobStoryProducer("-", conf);
verifyWordCountJobStory(jsp.getNextJob());
} finally {
System.setIn(origStdIn);
if (tmpIs != null) {
tmpIs.close();
}
lfs.delete(rootTempDir, true);
}
}
@Test
public void testReplaySubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.REPLAY;
System.out.println(" Replay started at " + System.currentTimeMillis());
doSubmission(false, false);
System.out.println(" Replay ended at " + System.currentTimeMillis());
System.out.println(" Replay started with default output path at time "
+ System.currentTimeMillis());
doSubmission(false, true);
System.out.println(" Replay ended with default output path at time "
+ System.currentTimeMillis());
}
@Test
public void testStressSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.STRESS;
System.out.println(" Stress started at " + System.currentTimeMillis());
doSubmission(false, false);
System.out.println(" Stress ended at " + System.currentTimeMillis());
}
@Test
public void testStressSubmitWithDefaultQueue() throws Exception {
policy = GridmixJobSubmissionPolicy.STRESS;
System.out.println(" Stress with default q started at "
+ System.currentTimeMillis());
doSubmission(true, false);
System.out.println(" Stress with default q ended at "
+ System.currentTimeMillis());
}
@Test
public void testSerialSubmit() throws Exception {
policy = GridmixJobSubmissionPolicy.SERIAL;
System.out.println("Serial started at " + System.currentTimeMillis());
doSubmission(false, false);
System.out.println("Serial ended at " + System.currentTimeMillis());
}
private void doSubmission(boolean useDefaultQueue,
boolean defaultOutputPath) throws Exception {
final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
final Path root = new Path("/user");
Configuration conf = null;
try{
ArrayList<String> argsList = new ArrayList<String>();
argsList.add("-D" + FilePool.GRIDMIX_MIN_FILE + "=0");
argsList.add("-D" + Gridmix.GRIDMIX_USR_RSV + "="
+ EchoUserResolver.class.getName());
// Set the config property gridmix.output.directory only if
// defaultOutputPath is false. If defaultOutputPath is true, then
// let us allow gridmix to use the path foo/gridmix/ as output dir.
if (!defaultOutputPath) {
argsList.add("-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out);
}
argsList.add("-generate");
argsList.add(String.valueOf(GENDATA) + "m");
argsList.add(in.toString());
argsList.add("-"); // ignored by DebugGridmix
String[] argv = argsList.toArray(new String[argsList.size()]);
DebugGridmix client = new DebugGridmix();
conf = new Configuration();
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
if (useDefaultQueue) {
conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, false);
conf.set(GridmixJob.GRIDMIX_DEFAULT_QUEUE, "q1");
} else {
conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true);
}
conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
// allow synthetic users to create home directories
GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
} catch (Exception e) {
e.printStackTrace();
} finally {
in.getFileSystem(conf).delete(in, true);
out.getFileSystem(conf).delete(out, true);
root.getFileSystem(conf).delete(root,true);
}
}
}