MRUNIT-115: set JobConf specific to the inputformat reading the results of the outputformat/copying when not using a real outputformat
initial checkin, need to add javadoc
git-svn-id: https://svn.apache.org/repos/asf/mrunit/trunk@1344232 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index 0ba6d1f..c8a47a0 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -217,6 +217,12 @@
return this;
}
+ public MapDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+ Configuration configuration) {
+ setOutputCopyingOrInputFormatConfiguration(configuration);
+ return this;
+ }
+
public MapDriver<K1, V1, K2, V2> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass,
final Class<? extends InputFormat> inputFormatClass) {
@@ -234,7 +240,8 @@
}
final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
- .createOutputCollectable(getConfiguration());
+ .createOutputCollectable(getConfiguration(),
+ getOutputCopyingOrInputFormatConfiguration());
final MockReporter reporter = new MockReporter(
MockReporter.ReporterType.Mapper, getCounters());
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
index 7d12dd3..a93c661 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
@@ -269,6 +269,12 @@
return this;
}
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputCopyingOrInputFormatConfiguration(
+ Configuration configuration) {
+ setOutputCopyingOrInputFormatConfiguration(configuration);
+ return this;
+ }
+
public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass,
final Class<? extends InputFormat> inputFormatClass) {
@@ -300,6 +306,10 @@
.newReduceDriver(reducer).withCounters(getCounters())
.withConfiguration(configuration).withInputKey(inputKey)
.withInputValues(inputValues);
+ if (getOutputCopyingOrInputFormatConfiguration() != null) {
+ reduceDriver
+ .withOutputCopyingOrInputFormatConfiguration(getOutputCopyingOrInputFormatConfiguration());
+ }
if (outputFormatClass != null) {
reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 1926cf6..f2190c1 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -227,6 +227,12 @@
return this;
}
+ public ReduceDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+ Configuration configuration) {
+ setOutputCopyingOrInputFormatConfiguration(configuration);
+ return this;
+ }
+
public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass,
final Class<? extends InputFormat> inputFormatClass) {
@@ -244,7 +250,8 @@
}
final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
- .createOutputCollectable(getConfiguration());
+ .createOutputCollectable(getConfiguration(),
+ getOutputCopyingOrInputFormatConfiguration());
final MockReporter reporter = new MockReporter(
MockReporter.ReporterType.Reducer, getCounters());
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index 05656ff..bbd8519 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -44,6 +44,7 @@
protected List<Pair<Pair<String, String>, Long>> expectedStringCounters;
protected Configuration configuration;
+ private Configuration outputCopyingOrInputFormatConf;
protected CounterWrapper counterWrapper;
@@ -143,6 +144,15 @@
this.configuration = returnNonNull(configuration);
}
+ public Configuration getOutputCopyingOrInputFormatConfiguration() {
+ return outputCopyingOrInputFormatConf;
+ }
+
+ public void setOutputCopyingOrInputFormatConfiguration(
+ final Configuration configuration) {
+ this.outputCopyingOrInputFormatConf = returnNonNull(configuration);
+ }
+
/**
* Runs the test but returns the result set instead of validating it (ignores
* any addOutput(), etc calls made before this)
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
index 54b3f63..1a29274 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
@@ -45,7 +45,8 @@
private static final String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
private static final TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
- private final JobConf conf;
+ private final JobConf outputFormatConf;
+ private final JobConf inputFormatConf;
private final File outputPath = new File(
System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
private final File outputFile = new File(outputPath, "part-00000");
@@ -54,13 +55,17 @@
private final OutputFormat outputFormat;
private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
- public MockMapredOutputFormat(JobConf conf,
+ public MockMapredOutputFormat(JobConf outputFormatConf,
Class<? extends OutputFormat> outputFormatClass,
- Class<? extends InputFormat> inputFormatClass) throws IOException {
- this.conf = conf;
+ Class<? extends InputFormat> inputFormatClass, JobConf inputFormatConf)
+ throws IOException {
+ this.outputFormatConf = outputFormatConf;
+ this.inputFormatConf = inputFormatConf;
- outputFormat = ReflectionUtils.newInstance(outputFormatClass, conf);
- inputFormat = ReflectionUtils.newInstance(inputFormatClass, conf);
+ outputFormat = ReflectionUtils.newInstance(outputFormatClass,
+ outputFormatConf);
+ inputFormat = ReflectionUtils
+ .newInstance(inputFormatClass, inputFormatConf);
if (outputPath.exists()) {
throw new IllegalStateException(
@@ -69,14 +74,15 @@
if (!outputPath.mkdir()) {
throw new IOException("Failed to create output dir " + outputPath);
}
- FileOutputFormat.setOutputPath(conf, new Path(outputPath.toString()));
- conf.set("mapred.task.id", TASK_ID.toString());
- FileSystem.getLocal(conf).mkdirs(
+ FileOutputFormat.setOutputPath(outputFormatConf,
+ new Path(outputPath.toString()));
+ outputFormatConf.set("mapred.task.id", TASK_ID.toString());
+ FileSystem.getLocal(outputFormatConf).mkdirs(
new Path(outputPath.toString(), FileOutputCommitter.TEMP_DIR_NAME));
}
private void setClassIfUnset(String name, Class<?> classType) {
- conf.setIfUnset(name, classType.getName());
+ outputFormatConf.setIfUnset(name, classType.getName());
}
@Override
@@ -87,8 +93,9 @@
setClassIfUnset("mapred.output.key.class", key.getClass());
setClassIfUnset("mapred.output.value.class", value.getClass());
- recordWriter = outputFormat.getRecordWriter(FileSystem.getLocal(conf),
- conf, outputFile.getName(), Reporter.NULL);
+ recordWriter = outputFormat.getRecordWriter(
+ FileSystem.getLocal(outputFormatConf), outputFormatConf,
+ outputFile.getName(), Reporter.NULL);
}
recordWriter.write(key, value);
@@ -98,10 +105,10 @@
public List<Pair<K, V>> getOutputs() throws IOException {
recordWriter.close(Reporter.NULL);
- FileInputFormat.setInputPaths(conf, outputPath + "/*/*/*/*");
- for (InputSplit inputSplit : inputFormat.getSplits(conf, 1)) {
+ FileInputFormat.setInputPaths(inputFormatConf, outputPath + "/*/*/*/*");
+ for (InputSplit inputSplit : inputFormat.getSplits(inputFormatConf, 1)) {
final RecordReader<K, V> recordReader = inputFormat.getRecordReader(
- inputSplit, conf, Reporter.NULL);
+ inputSplit, inputFormatConf, Reporter.NULL);
K key = recordReader.createKey();
V value = recordReader.createValue();
while (recordReader.next(key, value)) {
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
index 24a7fad..e71dd49 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
@@ -53,26 +53,27 @@
private static final Class<?>[] JOB_CONTEXT_CLASSES = new Class<?>[] {
Configuration.class, JobID.class };
- private final Job job;
+ private final Job outputFormatJob;
+ private final Job inputFormatJob;
private final File outputPath = new File(
System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
private TaskAttemptContext taskAttemptContext;
private RecordWriter recordWriter;
private final InputFormat inputFormat;
private final OutputFormat outputFormat;
- private final Serialization serialization;
private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
- public MockMapreduceOutputFormat(Job job,
+ public MockMapreduceOutputFormat(Job outputFormatJob,
Class<? extends OutputFormat> outputFormatClass,
- Class<? extends InputFormat> inputFormatClass) throws IOException {
- this.job = job;
+ Class<? extends InputFormat> inputFormatClass, Job inputFormatJob)
+ throws IOException {
+ this.outputFormatJob = outputFormatJob;
+ this.inputFormatJob = inputFormatJob;
outputFormat = ReflectionUtils.newInstance(outputFormatClass,
- job.getConfiguration());
+ outputFormatJob.getConfiguration());
inputFormat = ReflectionUtils.newInstance(inputFormatClass,
- job.getConfiguration());
- serialization = new Serialization((job.getConfiguration()));
+ inputFormatJob.getConfiguration());
if (outputPath.exists()) {
throw new IllegalStateException(
@@ -81,11 +82,12 @@
if (!outputPath.mkdir()) {
throw new IOException("Failed to create output dir " + outputPath);
}
- FileOutputFormat.setOutputPath(job, new Path(outputPath.toString()));
+ FileOutputFormat.setOutputPath(outputFormatJob,
+ new Path(outputPath.toString()));
}
private void setClassIfUnset(String name, Class<?> classType) {
- job.getConfiguration().setIfUnset(name, classType.getName());
+ outputFormatJob.getConfiguration().setIfUnset(name, classType.getName());
}
private Object createObject(String primaryClassName,
@@ -129,7 +131,8 @@
taskAttemptContext = (TaskAttemptContext) createObject(
"org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
"org.apache.hadoop.mapreduce.TaskAttemptContext",
- TASK_ATTEMPT_CONTEXT_CLASSES, job.getConfiguration(), TASK_ID);
+ TASK_ATTEMPT_CONTEXT_CLASSES, outputFormatJob.getConfiguration(),
+ TASK_ID);
recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
}
@@ -147,13 +150,15 @@
throw new IOException(e);
}
- FileInputFormat.setInputPaths(job, outputPath + "/*/*/*/*");
+ final Serialization serialization = new Serialization(
+ inputFormatJob.getConfiguration());
+ FileInputFormat.setInputPaths(inputFormatJob, outputPath + "/*/*/*/*");
try {
List<InputSplit> inputSplits = inputFormat
.getSplits((JobContext) createObject(
"org.apache.hadoop.mapreduce.task.JobContextImpl",
"org.apache.hadoop.mapreduce.JobContext", JOB_CONTEXT_CLASSES,
- job.getConfiguration(), new JobID()));
+ inputFormatJob.getConfiguration(), new JobID()));
for (InputSplit inputSplit : inputSplits) {
RecordReader<K, V> recordReader = inputFormat.createRecordReader(
inputSplit, taskAttemptContext);
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
index 8ea2497..396b715 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
@@ -50,16 +50,22 @@
mapreduceInputFormatClass = returnNonNull(inputFormatClass);
}
- public OutputCollectable<K, V> createOutputCollectable(Configuration conf)
- throws IOException {
+ public OutputCollectable<K, V> createOutputCollectable(
+ Configuration configuration,
+ Configuration outputCopyingOrInputFormatConfiguration) throws IOException {
+ outputCopyingOrInputFormatConfiguration = outputCopyingOrInputFormatConfiguration == null ? configuration
+ : outputCopyingOrInputFormatConfiguration;
if (mapredOutputFormatClass != null) {
- return new MockMapredOutputFormat<K, V>(new JobConf(conf),
- mapredOutputFormatClass, mapredInputFormatClass);
+ return new MockMapredOutputFormat<K, V>(new JobConf(configuration),
+ mapredOutputFormatClass, mapredInputFormatClass, new JobConf(
+ outputCopyingOrInputFormatConfiguration));
}
if (mapreduceOutputFormatClass != null) {
- return new MockMapreduceOutputFormat<K, V>(new Job(conf),
- mapreduceOutputFormatClass, mapreduceInputFormatClass);
+ return new MockMapreduceOutputFormat<K, V>(new Job(configuration),
+ mapreduceOutputFormatClass, mapreduceInputFormatClass, new Job(
+ outputCopyingOrInputFormatConfiguration));
}
- return new MockOutputCollector<K, V>(conf);
+ return new MockOutputCollector<K, V>(
+ outputCopyingOrInputFormatConfiguration);
}
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
index 1e690ba..13276f0 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
@@ -203,6 +203,12 @@
return this;
}
+ public MapDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+ Configuration configuration) {
+ setOutputCopyingOrInputFormatConfiguration(configuration);
+ return this;
+ }
+
public MapDriver<K1, V1, K2, V2> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass,
final Class<? extends InputFormat> inputFormatClass) {
@@ -224,7 +230,8 @@
try {
final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
- .createOutputCollectable(getConfiguration());
+ .createOutputCollectable(getConfiguration(),
+ getOutputCopyingOrInputFormatConfiguration());
final MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(
inputs, getCounters(), getConfiguration(), outputCollectable);
myMapper.run(wrapper.getMockContext());
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index eaa0e76..559f64f 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -267,6 +267,12 @@
return this;
}
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputCopyingOrInputFormatConfiguration(
+ Configuration configuration) {
+ setOutputCopyingOrInputFormatConfiguration(configuration);
+ return this;
+ }
+
public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass,
final Class<? extends InputFormat> inputFormatClass) {
@@ -298,6 +304,10 @@
.newReduceDriver(reducer).withCounters(getCounters())
.withConfiguration(configuration).withInputKey(inputKey)
.withInputValues(inputValues);
+ if (getOutputCopyingOrInputFormatConfiguration() != null) {
+ reduceDriver
+ .withOutputCopyingOrInputFormatConfiguration(getOutputCopyingOrInputFormatConfiguration());
+ }
if (outputFormatClass != null) {
reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index 4649d15..0bd672b 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -217,6 +217,12 @@
return this;
}
+ public ReduceDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+ Configuration configuration) {
+ setOutputCopyingOrInputFormatConfiguration(configuration);
+ return this;
+ }
+
public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass,
final Class<? extends InputFormat> inputFormatClass) {
@@ -238,7 +244,8 @@
try {
final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
- .createOutputCollectable(getConfiguration());
+ .createOutputCollectable(getConfiguration(),
+ getOutputCopyingOrInputFormatConfiguration());
final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
inputs, getCounters(), getConfiguration(), outputCollectable);
myReducer.run(wrapper.getMockContext());