MRUNIT-133: MapInput stuff should be moved from TestDriver to MapDriver

git-svn-id: https://svn.apache.org/repos/asf/mrunit/trunk@1371388 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index 6800421..fa9cff2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.mrunit;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
 
 import java.io.IOException;
 import java.util.List;
@@ -82,7 +82,7 @@
     setCounters(ctrs);
     return this;
   }
-
+  
   /**
    * Set the Mapper instance to use with this test driver
    * 
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java b/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
index ae58bb0..c2269e1 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -41,7 +42,9 @@
   public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
 
   protected List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
-  
+
+  protected Path mapInputPath = new Path("somefile");
+
   @Deprecated
   protected K1 inputKey;
   @Deprecated
@@ -97,6 +100,7 @@
    * @param inputRecord
    *          a (key, val) pair
    */
+  @SuppressWarnings("deprecation")
   public void setInput(final Pair<K1, V1> inputRecord) {
     setInputKey(inputRecord.getFirst());
     setInputValue(inputRecord.getSecond());
@@ -340,8 +344,33 @@
   }
 
   /**
+   * @return the path passed to the mapper InputSplit
+   */
+  public Path getMapInputPath() {
+    return mapInputPath;
+  }
+
+  /**
+   * @param mapInputPath Path which is to be passed to the mappers InputSplit
+   */
+  public void setMapInputPath(Path mapInputPath) {
+    this.mapInputPath = mapInputPath;
+  }
+  
+  /**
+   * @param mapInputPath
+   *       The Path object which will be given to the mapper
+   * @return
+   */
+  public final T withMapInputPath(Path mapInputPath) {
+    setMapInputPath(mapInputPath);
+    return thisAsTestDriver();
+  }
+  
+  /**
    * Handle inputKey and inputVal for backwards compatibility.
    */
+  @SuppressWarnings("deprecation")
   protected void preRunChecks(Object mapper) {
     if (inputKey != null && inputVal != null) {
       setInput(inputKey, inputVal);
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
index f87dd0f..49f5cd3 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -51,6 +52,8 @@
   public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
 
   protected List<Pair<K1, V1>> inputList = new ArrayList<Pair<K1, V1>>();
+  
+  protected Path mapInputPath = new Path("somefile");
 
   /** Key group comparator */
   protected Comparator<K2> keyGroupComparator;
@@ -270,6 +273,30 @@
     addAllOutput(outputRecords);
     return thisAsMapReduceDriver();
   }
+  
+  /**
+   * @return the path passed to the mapper InputSplit
+   */
+  public Path getMapInputPath() {
+    return mapInputPath;
+  }
+
+  /**
+   * @param mapInputPath Path which is to be passed to the mappers InputSplit
+   */
+  public void setMapInputPath(Path mapInputPath) {
+    this.mapInputPath = mapInputPath;
+  }
+  
+  /**
+   * @param mapInputPath
+   *       The Path object which will be given to the mapper
+   * @return
+   */
+  public final T withMapInputPath(Path mapInputPath) {
+    setMapInputPath(mapInputPath);
+    return thisAsTestDriver();
+  }
 
   protected void preRunChecks(Object mapper, Object reducer) {
     if (inputList.isEmpty()) {
diff --git a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
index 516df7b..fce3ed4 100644
--- a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
@@ -25,6 +25,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
@@ -61,6 +62,8 @@
   private List<Pair<Mapper, Reducer>> mapReducePipeline;
   private final List<Pair<K1, V1>> inputList;
   private Counters counters;
+  
+  protected Path mapInputPath = new Path("somefile");
 
   public PipelineMapReduceDriver(final List<Pair<Mapper, Reducer>> pipeline) {
     this();
@@ -308,6 +311,30 @@
     addOutputFromString(output);
     return this;
   }
+  
+  /**
+   * @return the path passed to the mapper InputSplit
+   */
+  public Path getMapInputPath() {
+    return mapInputPath;
+  }
+
+  /**
+   * @param mapInputPath Path which is to be passed to the mappers InputSplit
+   */
+  public void setMapInputPath(Path mapInputPath) {
+    this.mapInputPath = mapInputPath;
+  }
+  
+  /**
+   * @param mapInputPath
+   *       The Path object which will be given to the mapper
+   * @return
+   */
+  public final PipelineMapReduceDriver<K1, V1, K2, V2> withMapInputPath(Path mapInputPath) {
+    setMapInputPath(mapInputPath);
+    return this;
+  }
 
   @Override
   @SuppressWarnings("unchecked")
@@ -329,6 +356,7 @@
 
       mrDriver.setCounters(getCounters());
       mrDriver.setConfiguration(configuration);
+      mrDriver.setMapInputPath(mapInputPath);
 
       // Add the inputs from the user, or from the previous stage of the
       // pipeline.
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 76ecfa2..0a456b8 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -126,8 +126,7 @@
           .createOutputCollectable(getConfiguration(),
               getOutputCopyingOrInputFormatConfiguration());
       final MockReporter reporter = new MockReporter(
-          MockReporter.ReporterType.Reducer, getCounters(),
-          getMapInputPath());
+          MockReporter.ReporterType.Reducer, getCounters());
 
       ReflectionUtils.setConf(myReducer, new JobConf(getConfiguration()));
 
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index 03af72a..d7a3ded 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -56,8 +56,6 @@
   protected CounterWrapper counterWrapper;
 
   protected Serialization serialization;
-  
-  protected Path mapInputPath = new Path("somefile");
 
   public TestDriver() {
     expectedOutputs = new ArrayList<Pair<K2, V2>>();
@@ -104,7 +102,7 @@
   }
   
   @SuppressWarnings("unchecked")
-  private T thisAsTestDriver() {
+  protected T thisAsTestDriver() {
     return (T) this;
   }
 
@@ -193,30 +191,6 @@
   }
 
   /**
-   * @return the path passed to the mapper InputSplit
-   */
-  public Path getMapInputPath() {
-    return mapInputPath;
-  }
-
-  /**
-   * @param mapInputPath Path which is to be passed to the mappers InputSplit
-   */
-  public void setMapInputPath(Path mapInputPath) {
-    this.mapInputPath = mapInputPath;
-  }
-  
-  /**
-   * @param mapInputPath
-   *       The Path object which will be given to the mapper
-   * @return
-   */
-  public final T withMapInputPath(Path mapInputPath) {
-    setMapInputPath(mapInputPath);
-    return thisAsTestDriver();
-  }
-
-  /**
    * Adds a file to be put on the distributed cache. 
    * The path may be relative and will try to be resolved from
    * the classpath of the test. 
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
index 6855336..e28b48d 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
@@ -34,11 +34,19 @@
 
   private final ReporterType typ;
 
+  public MockReporter(final ReporterType kind, final Counters ctrs) {
+    this(kind, ctrs, null);
+  }
+  
   public MockReporter(final ReporterType kind, final Counters ctrs,
       final Path mapInputPath) {
     typ = kind;
     counters = ctrs;
-    inputSplit = new MockInputSplit(mapInputPath);
+    if(mapInputPath == null) {
+      inputSplit = null;
+    } else {
+      inputSplit = new MockInputSplit(mapInputPath);
+    }
   }
 
   @Override
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index 497c667..8bd3734 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.mrunit.mapreduce;
 
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
 
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;
diff --git a/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java b/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
new file mode 100644
index 0000000..e4b50b7
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+class InputPathStoringMapper<VALUEIN, VALUEOUT> extends MapReduceBase implements
+    Mapper<Text, VALUEIN, Text, VALUEOUT> {
+  private Path mapInputPath;
+
+  @Override
+  public void map(Text key, VALUEIN value, OutputCollector<Text, VALUEOUT> output,
+      Reporter reporter) throws IOException {
+    if (reporter.getInputSplit() instanceof FileSplit) {
+      mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
+    }
+  }
+
+  Path getMapInputPath() {
+    return mapInputPath;
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
index 7189d87..ed54646 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
@@ -29,7 +29,6 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
@@ -543,26 +542,9 @@
     driver.runTest();
   }
 
-  private static class InputPathStoringMapper extends MapReduceBase implements
-      Mapper<Text, Text, Text, Text> {
-    private Path mapInputPath;
-
-    @Override
-    public void map(Text key, Text value, OutputCollector<Text, Text> output,
-        Reporter reporter) throws IOException {
-      if (reporter.getInputSplit() instanceof FileSplit) {
-        mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
-      }
-    }
-
-    private Path getMapInputPath() {
-      return mapInputPath;
-    }
-  }
-
   @Test
   public void testMapInputFile() throws IOException {
-    InputPathStoringMapper mapper = new InputPathStoringMapper();
+    InputPathStoringMapper<Text, Text> mapper = new InputPathStoringMapper<Text, Text>();
     Path mapInputPath = new Path("myfile");
     driver = MapDriver.newMapDriver(mapper);
     driver.setMapInputPath(mapInputPath);
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
index 4912d81..47d39f5 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
@@ -566,27 +566,10 @@
     driver.runTest();
   }
 
-  private static class InputPathStoringMapper extends MapReduceBase implements
-      Mapper<Text, LongWritable, Text, LongWritable> {
-    private Path mapInputPath;
-
-    @Override
-    public void map(Text key, LongWritable value,
-        OutputCollector<Text, LongWritable> output, Reporter reporter)
-        throws IOException {
-      if (reporter.getInputSplit() instanceof FileSplit) {
-        mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
-      }
-    }
-
-    private Path getMapInputPath() {
-      return mapInputPath;
-    }
-  }
-
   @Test
   public void testMapInputFile() throws IOException {
-    InputPathStoringMapper mapper = new InputPathStoringMapper();
+    InputPathStoringMapper<LongWritable,LongWritable> mapper = 
+        new InputPathStoringMapper<LongWritable,LongWritable>();
     Path mapInputPath = new Path("myfile");
     driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
     driver.setMapInputPath(mapInputPath);
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
index af7d275..f384348 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.mrunit;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -306,4 +309,20 @@
     driver.addOutput(key, value);
     driver.runTest();
   }
+  
+  @Test
+  public void testMapInputFile() throws IOException {
+    InputPathStoringMapper<LongWritable,LongWritable> mapper = 
+        new InputPathStoringMapper<LongWritable,LongWritable>();
+    Path mapInputPath = new Path("myfile");
+    final PipelineMapReduceDriver<Text, LongWritable, Text, LongWritable> driver = PipelineMapReduceDriver
+        .newPipelineMapReduceDriver();
+    driver.addMapReduce(mapper, new IdentityReducer<Text, LongWritable>());
+    driver.setMapInputPath(mapInputPath);
+    assertEquals(mapInputPath.getName(), driver.getMapInputPath().getName());
+    driver.withInput(new Text("a"), new LongWritable(1));
+    driver.runTest();
+    assertNotNull(mapper.getMapInputPath());
+    assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
+  }
 }