blob: b24ff929486c214bdc8d39f2326ecb26a4834d61 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.mapreduce.TaskCounter.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.log4j.Level;
public class TestGridmixSubmission {
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
).getLogger().setLevel(Level.DEBUG);
}
private static FileSystem dfs = null;
private static MiniDFSCluster dfsCluster = null;
private static MiniMRCluster mrCluster = null;
private static final int NJOBS = 2;
private static final long GENDATA = 50; // in megabytes
private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
@BeforeClass
public static void initCluster() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000);
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1);
dfsCluster = new MiniDFSCluster(conf, 3, true, null);
dfs = dfsCluster.getFileSystem();
mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
new JobConf(conf));
}
@AfterClass
public static void shutdownCluster() throws IOException {
if (mrCluster != null) {
mrCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
static class TestMonitor extends JobMonitor {
static final long SLOPBYTES = 1024;
private final int expected;
private final BlockingQueue<Job> retiredJobs;
public TestMonitor(int expected) {
super();
this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>();
}
public void verify(ArrayList<JobStory> submitted) throws Exception {
final ArrayList<Job> succeeded = new ArrayList<Job>();
assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
for (JobStory spec : submitted) {
sub.put(spec.getName(), spec);
}
for (Job job : succeeded) {
final String jobname = job.getJobName();
if ("GRIDMIX_GENDATA".equals(jobname)) {
final Path in = new Path("foo").makeQualified(dfs);
final Path out = new Path("/gridmix").makeQualified(dfs);
final ContentSummary generated = dfs.getContentSummary(in);
assertTrue("Mismatched data gen", // +/- 100k for logs
(GENDATA << 20) < generated.getLength() + GENSLOP ||
(GENDATA << 20) > generated.getLength() - GENSLOP);
FileStatus[] outstat = dfs.listStatus(out);
assertEquals("Mismatched job count", NJOBS, outstat.length);
continue;
}
final JobStory spec =
sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
assertNotNull("No spec for " + job.getJobName(), spec);
assertNotNull("No counters for " + job.getJobName(), job.getCounters());
final int nMaps = spec.getNumberMaps();
final int nReds = spec.getNumberReduces();
System.out.println(jobname + ": " + nMaps + "/" + nReds);
final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
assertEquals("Mismatched map count", nMaps, mReports.length);
check(TaskType.MAP, job, spec, mReports,
0, 0, SLOPBYTES, nReds);
final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
assertEquals("Mismatched reduce count", nReds, rReports.length);
check(TaskType.REDUCE, job, spec, rReports,
nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
}
}
public void check(final TaskType type, Job job, JobStory spec,
final TaskReport[] runTasks,
long extraInputBytes, int extraInputRecords,
long extraOutputBytes, int extraOutputRecords) throws Exception {
long[] runInputRecords = new long[runTasks.length];
long[] runInputBytes = new long[runTasks.length];
long[] runOutputRecords = new long[runTasks.length];
long[] runOutputBytes = new long[runTasks.length];
long[] specInputRecords = new long[runTasks.length];
long[] specInputBytes = new long[runTasks.length];
long[] specOutputRecords = new long[runTasks.length];
long[] specOutputBytes = new long[runTasks.length];
for (int i = 0; i < runTasks.length; ++i) {
final TaskInfo specInfo;
final Counters counters = runTasks[i].getTaskCounters();
switch (type) {
case MAP:
runInputBytes[i] = counters.findCounter("FileSystemCounters",
"HDFS_BYTES_READ").getValue() -
counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue();
runInputRecords[i] =
(int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
runOutputBytes[i] =
counters.findCounter(MAP_OUTPUT_BYTES).getValue();
runOutputRecords[i] =
(int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.MAP, i);
specInputRecords[i] = specInfo.getInputRecords();
specInputBytes[i] = specInfo.getInputBytes();
specOutputRecords[i] = specInfo.getOutputRecords();
specOutputBytes[i] = specInfo.getOutputBytes();
System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
specInputBytes[i], specOutputBytes[i],
specInputRecords[i], specOutputRecords[i]);
System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
runInputBytes[i], runOutputBytes[i],
runInputRecords[i], runOutputRecords[i]);
break;
case REDUCE:
runInputBytes[i] = 0;
runInputRecords[i] =
(int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
runOutputBytes[i] =
counters.findCounter("FileSystemCounters",
"HDFS_BYTES_WRITTEN").getValue();
runOutputRecords[i] =
(int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
// There is no reliable counter for reduce input bytes. The
// variable-length encoding of intermediate records and other noise
// make this quantity difficult to estimate. The shuffle and spec
// input bytes are included in debug output for reference, but are
// not checked
specInputBytes[i] = 0;
specInputRecords[i] = specInfo.getInputRecords();
specOutputRecords[i] = specInfo.getOutputRecords();
specOutputBytes[i] = specInfo.getOutputBytes();
System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
specInfo.getInputBytes(), specOutputBytes[i],
specInputRecords[i], specOutputRecords[i]);
System.out.printf(type + " RUN: (%9d) -> %9d :: %5d -> %5d\n",
counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
break;
default:
specInfo = null;
fail("Unexpected type: " + type);
}
}
// Check input bytes
Arrays.sort(specInputBytes);
Arrays.sort(runInputBytes);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " input bytes " +
specInputBytes[i] + "/" + runInputBytes[i],
eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
}
// Check input records
Arrays.sort(specInputRecords);
Arrays.sort(runInputRecords);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " input records " +
specInputRecords[i] + "/" + runInputRecords[i],
eqPlusMinus(runInputRecords[i], specInputRecords[i],
extraInputRecords));
}
// Check output bytes
Arrays.sort(specOutputBytes);
Arrays.sort(runOutputBytes);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " output bytes " +
specOutputBytes[i] + "/" + runOutputBytes[i],
eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
extraOutputBytes));
}
// Check output records
Arrays.sort(specOutputRecords);
Arrays.sort(runOutputRecords);
for (int i = 0; i < runTasks.length; ++i) {
assertTrue("Mismatched " + type + " output records " +
specOutputRecords[i] + "/" + runOutputRecords[i],
eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
extraOutputRecords));
}
}
private static boolean eqPlusMinus(long a, long b, long x) {
final long diff = Math.abs(a - b);
return diff <= x;
}
@Override
protected void onSuccess(Job job) {
retiredJobs.add(job);
}
@Override
protected void onFailure(Job job) {
fail("Job failure: " + job);
}
}
static class DebugGridmix extends Gridmix {
private DebugJobFactory factory;
private TestMonitor monitor;
public void checkMonitor() throws Exception {
monitor.verify(factory.getSubmitted());
}
@Override
protected JobMonitor createJobMonitor() {
monitor = new TestMonitor(NJOBS + 1); // include data generation job
return monitor;
}
@Override
protected JobFactory createJobFactory(JobSubmitter submitter,
String traceIn, Path scratchDir, Configuration conf,
CountDownLatch startFlag) throws IOException {
factory =
new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag);
return factory;
}
}
@Test
public void testSubmit() throws Exception {
final Path in = new Path("foo").makeQualified(dfs);
final Path out = new Path("/gridmix").makeQualified(dfs);
final String[] argv = {
"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
"-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
"-generate", String.valueOf(GENDATA) + "m",
in.toString(),
"-" // ignored by DebugGridmix
};
DebugGridmix client = new DebugGridmix();
final Configuration conf = mrCluster.createJobConf();
//conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
}
}