MRUNIT-115: set JobConf specific to the inputformat reading the results of the outputformat/copying when not using a real outputformat

initial checkin, need to add javadoc

git-svn-id: https://svn.apache.org/repos/asf/mrunit/trunk@1344232 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index 0ba6d1f..c8a47a0 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -217,6 +217,12 @@
     return this;
   }
 
+  public MapDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+      Configuration configuration) {
+    setOutputCopyingOrInputFormatConfiguration(configuration);
+    return this;
+  }
+
   public MapDriver<K1, V1, K2, V2> withOutputFormat(
       final Class<? extends OutputFormat> outputFormatClass,
       final Class<? extends InputFormat> inputFormatClass) {
@@ -234,7 +240,8 @@
     }
 
     final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-        .createOutputCollectable(getConfiguration());
+        .createOutputCollectable(getConfiguration(),
+            getOutputCopyingOrInputFormatConfiguration());
     final MockReporter reporter = new MockReporter(
         MockReporter.ReporterType.Mapper, getCounters());
 
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
index 7d12dd3..a93c661 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
@@ -269,6 +269,12 @@
     return this;
   }
 
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputCopyingOrInputFormatConfiguration(
+      Configuration configuration) {
+    setOutputCopyingOrInputFormatConfiguration(configuration);
+    return this;
+  }
+
   public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
       final Class<? extends OutputFormat> outputFormatClass,
       final Class<? extends InputFormat> inputFormatClass) {
@@ -300,6 +306,10 @@
             .newReduceDriver(reducer).withCounters(getCounters())
             .withConfiguration(configuration).withInputKey(inputKey)
             .withInputValues(inputValues);
+        if (getOutputCopyingOrInputFormatConfiguration() != null) {
+          reduceDriver
+              .withOutputCopyingOrInputFormatConfiguration(getOutputCopyingOrInputFormatConfiguration());
+        }
         if (outputFormatClass != null) {
           reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
         }
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 1926cf6..f2190c1 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -227,6 +227,12 @@
     return this;
   }
 
+  public ReduceDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+      Configuration configuration) {
+    setOutputCopyingOrInputFormatConfiguration(configuration);
+    return this;
+  }
+
   public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
       final Class<? extends OutputFormat> outputFormatClass,
       final Class<? extends InputFormat> inputFormatClass) {
@@ -244,7 +250,8 @@
     }
 
     final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-        .createOutputCollectable(getConfiguration());
+        .createOutputCollectable(getConfiguration(),
+            getOutputCopyingOrInputFormatConfiguration());
     final MockReporter reporter = new MockReporter(
         MockReporter.ReporterType.Reducer, getCounters());
 
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index 05656ff..bbd8519 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -44,6 +44,7 @@
   protected List<Pair<Pair<String, String>, Long>> expectedStringCounters;
 
   protected Configuration configuration;
+  private Configuration outputCopyingOrInputFormatConf;
 
   protected CounterWrapper counterWrapper;
 
@@ -143,6 +144,15 @@
     this.configuration = returnNonNull(configuration);
   }
 
+  public Configuration getOutputCopyingOrInputFormatConfiguration() {
+    return outputCopyingOrInputFormatConf;
+  }
+
+  public void setOutputCopyingOrInputFormatConfiguration(
+      final Configuration configuration) {
+    this.outputCopyingOrInputFormatConf = returnNonNull(configuration);
+  }
+
   /**
    * Runs the test but returns the result set instead of validating it (ignores
    * any addOutput(), etc calls made before this)
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
index 54b3f63..1a29274 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
@@ -45,7 +45,8 @@
   private static final String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
   private static final TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
 
-  private final JobConf conf;
+  private final JobConf outputFormatConf;
+  private final JobConf inputFormatConf;
   private final File outputPath = new File(
       System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
   private final File outputFile = new File(outputPath, "part-00000");
@@ -54,13 +55,17 @@
   private final OutputFormat outputFormat;
   private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
 
-  public MockMapredOutputFormat(JobConf conf,
+  public MockMapredOutputFormat(JobConf outputFormatConf,
       Class<? extends OutputFormat> outputFormatClass,
-      Class<? extends InputFormat> inputFormatClass) throws IOException {
-    this.conf = conf;
+      Class<? extends InputFormat> inputFormatClass, JobConf inputFormatConf)
+      throws IOException {
+    this.outputFormatConf = outputFormatConf;
+    this.inputFormatConf = inputFormatConf;
 
-    outputFormat = ReflectionUtils.newInstance(outputFormatClass, conf);
-    inputFormat = ReflectionUtils.newInstance(inputFormatClass, conf);
+    outputFormat = ReflectionUtils.newInstance(outputFormatClass,
+        outputFormatConf);
+    inputFormat = ReflectionUtils
+        .newInstance(inputFormatClass, inputFormatConf);
 
     if (outputPath.exists()) {
       throw new IllegalStateException(
@@ -69,14 +74,15 @@
     if (!outputPath.mkdir()) {
       throw new IOException("Failed to create output dir " + outputPath);
     }
-    FileOutputFormat.setOutputPath(conf, new Path(outputPath.toString()));
-    conf.set("mapred.task.id", TASK_ID.toString());
-    FileSystem.getLocal(conf).mkdirs(
+    FileOutputFormat.setOutputPath(outputFormatConf,
+        new Path(outputPath.toString()));
+    outputFormatConf.set("mapred.task.id", TASK_ID.toString());
+    FileSystem.getLocal(outputFormatConf).mkdirs(
         new Path(outputPath.toString(), FileOutputCommitter.TEMP_DIR_NAME));
   }
 
   private void setClassIfUnset(String name, Class<?> classType) {
-    conf.setIfUnset(name, classType.getName());
+    outputFormatConf.setIfUnset(name, classType.getName());
   }
 
   @Override
@@ -87,8 +93,9 @@
       setClassIfUnset("mapred.output.key.class", key.getClass());
       setClassIfUnset("mapred.output.value.class", value.getClass());
 
-      recordWriter = outputFormat.getRecordWriter(FileSystem.getLocal(conf),
-          conf, outputFile.getName(), Reporter.NULL);
+      recordWriter = outputFormat.getRecordWriter(
+          FileSystem.getLocal(outputFormatConf), outputFormatConf,
+          outputFile.getName(), Reporter.NULL);
     }
 
     recordWriter.write(key, value);
@@ -98,10 +105,10 @@
   public List<Pair<K, V>> getOutputs() throws IOException {
     recordWriter.close(Reporter.NULL);
 
-    FileInputFormat.setInputPaths(conf, outputPath + "/*/*/*/*");
-    for (InputSplit inputSplit : inputFormat.getSplits(conf, 1)) {
+    FileInputFormat.setInputPaths(inputFormatConf, outputPath + "/*/*/*/*");
+    for (InputSplit inputSplit : inputFormat.getSplits(inputFormatConf, 1)) {
       final RecordReader<K, V> recordReader = inputFormat.getRecordReader(
-          inputSplit, conf, Reporter.NULL);
+          inputSplit, inputFormatConf, Reporter.NULL);
       K key = recordReader.createKey();
       V value = recordReader.createValue();
       while (recordReader.next(key, value)) {
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
index 24a7fad..e71dd49 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
@@ -53,26 +53,27 @@
   private static final Class<?>[] JOB_CONTEXT_CLASSES = new Class<?>[] {
       Configuration.class, JobID.class };
 
-  private final Job job;
+  private final Job outputFormatJob;
+  private final Job inputFormatJob;
   private final File outputPath = new File(
       System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
   private TaskAttemptContext taskAttemptContext;
   private RecordWriter recordWriter;
   private final InputFormat inputFormat;
   private final OutputFormat outputFormat;
-  private final Serialization serialization;
   private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
 
-  public MockMapreduceOutputFormat(Job job,
+  public MockMapreduceOutputFormat(Job outputFormatJob,
       Class<? extends OutputFormat> outputFormatClass,
-      Class<? extends InputFormat> inputFormatClass) throws IOException {
-    this.job = job;
+      Class<? extends InputFormat> inputFormatClass, Job inputFormatJob)
+      throws IOException {
+    this.outputFormatJob = outputFormatJob;
+    this.inputFormatJob = inputFormatJob;
 
     outputFormat = ReflectionUtils.newInstance(outputFormatClass,
-        job.getConfiguration());
+        outputFormatJob.getConfiguration());
     inputFormat = ReflectionUtils.newInstance(inputFormatClass,
-        job.getConfiguration());
-    serialization = new Serialization((job.getConfiguration()));
+        inputFormatJob.getConfiguration());
 
     if (outputPath.exists()) {
       throw new IllegalStateException(
@@ -81,11 +82,12 @@
     if (!outputPath.mkdir()) {
       throw new IOException("Failed to create output dir " + outputPath);
     }
-    FileOutputFormat.setOutputPath(job, new Path(outputPath.toString()));
+    FileOutputFormat.setOutputPath(outputFormatJob,
+        new Path(outputPath.toString()));
   }
 
   private void setClassIfUnset(String name, Class<?> classType) {
-    job.getConfiguration().setIfUnset(name, classType.getName());
+    outputFormatJob.getConfiguration().setIfUnset(name, classType.getName());
   }
 
   private Object createObject(String primaryClassName,
@@ -129,7 +131,8 @@
         taskAttemptContext = (TaskAttemptContext) createObject(
             "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
             "org.apache.hadoop.mapreduce.TaskAttemptContext",
-            TASK_ATTEMPT_CONTEXT_CLASSES, job.getConfiguration(), TASK_ID);
+            TASK_ATTEMPT_CONTEXT_CLASSES, outputFormatJob.getConfiguration(),
+            TASK_ID);
         recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
       }
 
@@ -147,13 +150,15 @@
       throw new IOException(e);
     }
 
-    FileInputFormat.setInputPaths(job, outputPath + "/*/*/*/*");
+    final Serialization serialization = new Serialization(
+        inputFormatJob.getConfiguration());
+    FileInputFormat.setInputPaths(inputFormatJob, outputPath + "/*/*/*/*");
     try {
       List<InputSplit> inputSplits = inputFormat
           .getSplits((JobContext) createObject(
               "org.apache.hadoop.mapreduce.task.JobContextImpl",
               "org.apache.hadoop.mapreduce.JobContext", JOB_CONTEXT_CLASSES,
-              job.getConfiguration(), new JobID()));
+              inputFormatJob.getConfiguration(), new JobID()));
       for (InputSplit inputSplit : inputSplits) {
         RecordReader<K, V> recordReader = inputFormat.createRecordReader(
             inputSplit, taskAttemptContext);
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
index 8ea2497..396b715 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
@@ -50,16 +50,22 @@
     mapreduceInputFormatClass = returnNonNull(inputFormatClass);
   }
 
-  public OutputCollectable<K, V> createOutputCollectable(Configuration conf)
-      throws IOException {
+  public OutputCollectable<K, V> createOutputCollectable(
+      Configuration configuration,
+      Configuration outputCopyingOrInputFormatConfiguration) throws IOException {
+    outputCopyingOrInputFormatConfiguration = outputCopyingOrInputFormatConfiguration == null ? configuration
+        : outputCopyingOrInputFormatConfiguration;
     if (mapredOutputFormatClass != null) {
-      return new MockMapredOutputFormat<K, V>(new JobConf(conf),
-          mapredOutputFormatClass, mapredInputFormatClass);
+      return new MockMapredOutputFormat<K, V>(new JobConf(configuration),
+          mapredOutputFormatClass, mapredInputFormatClass, new JobConf(
+              outputCopyingOrInputFormatConfiguration));
     }
     if (mapreduceOutputFormatClass != null) {
-      return new MockMapreduceOutputFormat<K, V>(new Job(conf),
-          mapreduceOutputFormatClass, mapreduceInputFormatClass);
+      return new MockMapreduceOutputFormat<K, V>(new Job(configuration),
+          mapreduceOutputFormatClass, mapreduceInputFormatClass, new Job(
+              outputCopyingOrInputFormatConfiguration));
     }
-    return new MockOutputCollector<K, V>(conf);
+    return new MockOutputCollector<K, V>(
+        outputCopyingOrInputFormatConfiguration);
   }
 }
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
index 1e690ba..13276f0 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
@@ -203,6 +203,12 @@
     return this;
   }
 
+  public MapDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+      Configuration configuration) {
+    setOutputCopyingOrInputFormatConfiguration(configuration);
+    return this;
+  }
+
   public MapDriver<K1, V1, K2, V2> withOutputFormat(
       final Class<? extends OutputFormat> outputFormatClass,
       final Class<? extends InputFormat> inputFormatClass) {
@@ -224,7 +230,8 @@
 
     try {
       final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-          .createOutputCollectable(getConfiguration());
+          .createOutputCollectable(getConfiguration(),
+              getOutputCopyingOrInputFormatConfiguration());
       final MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(
           inputs, getCounters(), getConfiguration(), outputCollectable);
       myMapper.run(wrapper.getMockContext());
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index eaa0e76..559f64f 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -267,6 +267,12 @@
     return this;
   }
 
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputCopyingOrInputFormatConfiguration(
+      Configuration configuration) {
+    setOutputCopyingOrInputFormatConfiguration(configuration);
+    return this;
+  }
+
   public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
       final Class<? extends OutputFormat> outputFormatClass,
       final Class<? extends InputFormat> inputFormatClass) {
@@ -298,6 +304,10 @@
             .newReduceDriver(reducer).withCounters(getCounters())
             .withConfiguration(configuration).withInputKey(inputKey)
             .withInputValues(inputValues);
+        if (getOutputCopyingOrInputFormatConfiguration() != null) {
+          reduceDriver
+              .withOutputCopyingOrInputFormatConfiguration(getOutputCopyingOrInputFormatConfiguration());
+        }
         if (outputFormatClass != null) {
           reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
         }
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index 4649d15..0bd672b 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -217,6 +217,12 @@
     return this;
   }
 
+  public ReduceDriver<K1, V1, K2, V2> withOutputCopyingOrInputFormatConfiguration(
+      Configuration configuration) {
+    setOutputCopyingOrInputFormatConfiguration(configuration);
+    return this;
+  }
+
   public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
       final Class<? extends OutputFormat> outputFormatClass,
       final Class<? extends InputFormat> inputFormatClass) {
@@ -238,7 +244,8 @@
 
     try {
       final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
-          .createOutputCollectable(getConfiguration());
+          .createOutputCollectable(getConfiguration(),
+              getOutputCopyingOrInputFormatConfiguration());
       final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
           inputs, getCounters(), getConfiguration(), outputCollectable);
       myReducer.run(wrapper.getMockContext());