blob: 73c03fd948a7096bd88197460fb8a8624a118ca1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.util.ToolRunner;
public class CommonJobTest {
public static final Logger LOG = LoggerFactory.getLogger(Gridmix.class);
protected static int NJOBS = 2;
protected static final long GENDATA = 1; // in megabytes
protected static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
private static File workspace = new File("target" + File.separator
+ TestGridmixSubmission.class.getName() + "-test");
static class DebugGridmix extends Gridmix {
private JobFactory<?> factory;
private TestMonitor monitor;
@Override
protected JobMonitor createJobMonitor(Statistics stats, Configuration conf)
throws IOException {
monitor = new TestMonitor(3, stats);
return monitor;
}
@Override
protected JobFactory<?> createJobFactory(JobSubmitter submitter,
String traceIn, Path scratchDir, Configuration conf,
CountDownLatch startFlag, UserResolver userResolver) throws IOException {
factory = DebugJobFactory.getFactory(submitter, scratchDir, NJOBS, conf,
startFlag, userResolver);
return factory;
}
public void checkMonitor() throws Exception {
monitor.verify(((DebugJobFactory.Debuggable) factory).getSubmitted());
}
}
static class TestMonitor extends JobMonitor {
private final BlockingQueue<Job> retiredJobs;
private final int expected;
static final long SLOPBYTES = 1024;
public TestMonitor(int expected, Statistics stats) {
super(3, TimeUnit.SECONDS, stats, 1);
this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>();
}
@Override
protected void onSuccess(Job job) {
LOG.info(" Job Success " + job);
retiredJobs.add(job);
}
@Override
protected void onFailure(Job job) {
fail("Job failure: " + job);
}
public void verify(ArrayList<JobStory> submitted) throws Exception {
assertEquals("Bad job count", expected, retiredJobs.size());
final ArrayList<Job> succeeded = new ArrayList<Job>();
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
final HashMap<String, JobStory> sub = new HashMap<String, JobStory>();
for (JobStory spec : submitted) {
sub.put(spec.getJobID().toString(), spec);
}
for (Job job : succeeded) {
final String jobName = job.getJobName();
Configuration configuration = job.getConfiguration();
if (GenerateData.JOB_NAME.equals(jobName)) {
RemoteIterator<LocatedFileStatus> rit = GridmixTestUtils.dfs
.listFiles(new Path("/"), true);
while (rit.hasNext()) {
System.out.println(rit.next().toString());
}
final Path in = new Path("foo").makeQualified(
GridmixTestUtils.dfs.getUri(),
GridmixTestUtils.dfs.getWorkingDirectory());
// data was compressed. All files = compressed test size+ logs= 1000000/2 + logs
final ContentSummary generated = GridmixTestUtils.dfs
.getContentSummary(in);
assertEquals(550000, generated.getLength(), 10000);
Counter counter = job.getCounters()
.getGroup("org.apache.hadoop.mapreduce.FileSystemCounter")
.findCounter("HDFS_BYTES_WRITTEN");
assertEquals(generated.getLength(), counter.getValue());
continue;
} else if (GenerateDistCacheData.JOB_NAME.equals(jobName)) {
continue;
}
final String originalJobId = configuration.get(Gridmix.ORIGINAL_JOB_ID);
final JobStory spec = sub.get(originalJobId);
assertNotNull("No spec for " + jobName, spec);
assertNotNull("No counters for " + jobName, job.getCounters());
final String originalJobName = spec.getName();
System.out.println("originalJobName=" + originalJobName
+ ";GridmixJobName=" + jobName + ";originalJobID=" + originalJobId);
assertTrue("Original job name is wrong.",
originalJobName.equals(configuration.get(Gridmix.ORIGINAL_JOB_NAME)));
// Gridmix job seqNum contains 6 digits
int seqNumLength = 6;
String jobSeqNum = new DecimalFormat("000000").format(configuration.getInt(
GridmixJob.GRIDMIX_JOB_SEQ, -1));
// Original job name is of the format MOCKJOB<6 digit sequence number>
// because MockJob jobNames are of this format.
assertTrue(originalJobName.substring(
originalJobName.length() - seqNumLength).equals(jobSeqNum));
assertTrue("Gridmix job name is not in the expected format.",
jobName.equals(GridmixJob.JOB_NAME_PREFIX + jobSeqNum));
final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path(
GridmixTestUtils.DEST, "" + Integer.parseInt(jobSeqNum)));
assertEquals("Wrong owner for " + jobName, spec.getUser(),
stat.getOwner());
final int nMaps = spec.getNumberMaps();
final int nReds = spec.getNumberReduces();
final JobClient client = new JobClient(
GridmixTestUtils.mrvl.getConfig());
final TaskReport[] mReports = client.getMapTaskReports(JobID
.downgrade(job.getJobID()));
assertEquals("Mismatched map count", nMaps, mReports.length);
check(TaskType.MAP, spec, mReports, 0, 0, SLOPBYTES, nReds);
final TaskReport[] rReports = client.getReduceTaskReports(JobID
.downgrade(job.getJobID()));
assertEquals("Mismatched reduce count", nReds, rReports.length);
check(TaskType.REDUCE, spec, rReports, nMaps * SLOPBYTES, 2 * nMaps, 0,
0);
}
}
// Verify if correct job queue is used
private void check(final TaskType type, JobStory spec,
final TaskReport[] runTasks, long extraInputBytes,
int extraInputRecords, long extraOutputBytes, int extraOutputRecords)
throws Exception {
long[] runInputRecords = new long[runTasks.length];
long[] runInputBytes = new long[runTasks.length];
long[] runOutputRecords = new long[runTasks.length];
long[] runOutputBytes = new long[runTasks.length];
long[] specInputRecords = new long[runTasks.length];
long[] specInputBytes = new long[runTasks.length];
long[] specOutputRecords = new long[runTasks.length];
long[] specOutputBytes = new long[runTasks.length];
for (int i = 0; i < runTasks.length; ++i) {
final TaskInfo specInfo;
final Counters counters = runTasks[i].getCounters();
switch (type) {
case MAP:
runInputBytes[i] = counters.findCounter("FileSystemCounters",
"HDFS_BYTES_READ").getValue()
- counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue();
runInputRecords[i] = (int) counters.findCounter(
TaskCounter.MAP_INPUT_RECORDS).getValue();
runOutputBytes[i] = counters
.findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
runOutputRecords[i] = (int) counters.findCounter(
TaskCounter.MAP_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.MAP, i);
specInputRecords[i] = specInfo.getInputRecords();
specInputBytes[i] = specInfo.getInputBytes();
specOutputRecords[i] = specInfo.getOutputRecords();
specOutputBytes[i] = specInfo.getOutputBytes();
LOG.info(String.format(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
specInputBytes[i], specOutputBytes[i], specInputRecords[i],
specOutputRecords[i]));
LOG.info(String.format(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
runInputBytes[i], runOutputBytes[i], runInputRecords[i],
runOutputRecords[i]));
break;
case REDUCE:
runInputBytes[i] = 0;
runInputRecords[i] = (int) counters.findCounter(
TaskCounter.REDUCE_INPUT_RECORDS).getValue();
runOutputBytes[i] = counters.findCounter("FileSystemCounters",
"HDFS_BYTES_WRITTEN").getValue();
runOutputRecords[i] = (int) counters.findCounter(
TaskCounter.REDUCE_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
// There is no reliable counter for reduce input bytes. The
// variable-length encoding of intermediate records and other noise
// make this quantity difficult to estimate. The shuffle and spec
// input bytes are included in debug output for reference, but are
// not checked
specInputBytes[i] = 0;
specInputRecords[i] = specInfo.getInputRecords();
specOutputRecords[i] = specInfo.getOutputRecords();
specOutputBytes[i] = specInfo.getOutputBytes();
LOG.info(String.format(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
specInfo.getInputBytes(), specOutputBytes[i],
specInputRecords[i], specOutputRecords[i]));
LOG.info(String
.format(type + " RUN: (%9d) -> %9d :: %5d -> %5d\n", counters
.findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES).getValue(),
runOutputBytes[i], runInputRecords[i], runOutputRecords[i]));
break;
default:
fail("Unexpected type: " + type);
}
}
// Check input bytes
Arrays.sort(specInputBytes);
Arrays.sort(runInputBytes);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " input bytes " + specInputBytes[i]
+ "/" + runInputBytes[i],
eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
}
// Check input records
Arrays.sort(specInputRecords);
Arrays.sort(runInputRecords);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue(
"Mismatched " + type + " input records " + specInputRecords[i]
+ "/" + runInputRecords[i],
eqPlusMinus(runInputRecords[i], specInputRecords[i],
extraInputRecords));
}
// Check output bytes
Arrays.sort(specOutputBytes);
Arrays.sort(runOutputBytes);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue(
"Mismatched " + type + " output bytes " + specOutputBytes[i] + "/"
+ runOutputBytes[i],
eqPlusMinus(runOutputBytes[i], specOutputBytes[i], extraOutputBytes));
}
// Check output records
Arrays.sort(specOutputRecords);
Arrays.sort(runOutputRecords);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue(
"Mismatched " + type + " output records " + specOutputRecords[i]
+ "/" + runOutputRecords[i],
eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
extraOutputRecords));
}
}
private static boolean eqPlusMinus(long a, long b, long x) {
final long diff = Math.abs(a - b);
return diff <= x;
}
}
protected void doSubmission(String jobCreatorName, boolean defaultOutputPath)
throws Exception {
final Path in = new Path("foo").makeQualified(
GridmixTestUtils.dfs.getUri(),
GridmixTestUtils.dfs.getWorkingDirectory());
final Path out = GridmixTestUtils.DEST.makeQualified(
GridmixTestUtils.dfs.getUri(),
GridmixTestUtils.dfs.getWorkingDirectory());
final Path root = new Path(workspace.getName()).makeQualified(
GridmixTestUtils.dfs.getUri(), GridmixTestUtils.dfs.getWorkingDirectory());
if (!workspace.exists()) {
assertTrue(workspace.mkdirs());
}
Configuration conf = null;
try {
ArrayList<String> argsList = new ArrayList<String>();
argsList.add("-D" + FilePool.GRIDMIX_MIN_FILE + "=0");
argsList.add("-D" + Gridmix.GRIDMIX_USR_RSV + "="
+ EchoUserResolver.class.getName());
if (jobCreatorName != null) {
argsList.add("-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + jobCreatorName);
}
// Set the config property gridmix.output.directory only if
// defaultOutputPath is false. If defaultOutputPath is true, then
// let us allow gridmix to use the path foo/gridmix/ as output dir.
if (!defaultOutputPath) {
argsList.add("-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out);
}
argsList.add("-generate");
argsList.add(String.valueOf(GENDATA) + "m");
argsList.add(in.toString());
argsList.add("-"); // ignored by DebugGridmix
String[] argv = argsList.toArray(new String[argsList.size()]);
DebugGridmix client = new DebugGridmix();
conf = GridmixTestUtils.mrvl.getConfig();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true);
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
conf.set(MRJobConfig.USER_NAME, ugi.getUserName());
// allow synthetic users to create home directories
GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 777));
GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 777));
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
} catch (Exception e) {
e.printStackTrace();
} finally {
in.getFileSystem(conf).delete(in, true);
out.getFileSystem(conf).delete(out, true);
root.getFileSystem(conf).delete(root, true);
}
}
}