MRUNIT-133: MapInput stuff should be moved from TestDriver to MapDriver
git-svn-id: https://svn.apache.org/repos/asf/mrunit/trunk@1371388 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index 6800421..fa9cff2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.mrunit;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
import java.io.IOException;
import java.util.List;
@@ -82,7 +82,7 @@
setCounters(ctrs);
return this;
}
-
+
/**
* Set the Mapper instance to use with this test driver
*
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java b/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
index ae58bb0..c2269e1 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriverBase.java
@@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
import org.apache.hadoop.mrunit.types.Pair;
@@ -41,7 +42,9 @@
public static final Log LOG = LogFactory.getLog(MapDriverBase.class);
protected List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
-
+
+ protected Path mapInputPath = new Path("somefile");
+
@Deprecated
protected K1 inputKey;
@Deprecated
@@ -97,6 +100,7 @@
* @param inputRecord
* a (key, val) pair
*/
+ @SuppressWarnings("deprecation")
public void setInput(final Pair<K1, V1> inputRecord) {
setInputKey(inputRecord.getFirst());
setInputValue(inputRecord.getSecond());
@@ -340,8 +344,33 @@
}
/**
+ * @return the path passed to the mapper InputSplit
+ */
+ public Path getMapInputPath() {
+ return mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath Path which is to be passed to the mappers InputSplit
+ */
+ public void setMapInputPath(Path mapInputPath) {
+ this.mapInputPath = mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath
+ * The Path object which will be given to the mapper
+ * @return
+ */
+ public final T withMapInputPath(Path mapInputPath) {
+ setMapInputPath(mapInputPath);
+ return thisAsTestDriver();
+ }
+
+ /**
* Handle inputKey and inputVal for backwards compatibility.
*/
+ @SuppressWarnings("deprecation")
protected void preRunChecks(Object mapper) {
if (inputKey != null && inputVal != null) {
setInput(inputKey, inputVal);
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
index f87dd0f..49f5cd3 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriverBase.java
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -51,6 +52,8 @@
public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
protected List<Pair<K1, V1>> inputList = new ArrayList<Pair<K1, V1>>();
+
+ protected Path mapInputPath = new Path("somefile");
/** Key group comparator */
protected Comparator<K2> keyGroupComparator;
@@ -270,6 +273,30 @@
addAllOutput(outputRecords);
return thisAsMapReduceDriver();
}
+
+ /**
+ * @return the path passed to the mapper InputSplit
+ */
+ public Path getMapInputPath() {
+ return mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath Path which is to be passed to the mappers InputSplit
+ */
+ public void setMapInputPath(Path mapInputPath) {
+ this.mapInputPath = mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath
+ * The Path object which will be given to the mapper
+ * @return
+ */
+ public final T withMapInputPath(Path mapInputPath) {
+ setMapInputPath(mapInputPath);
+ return thisAsTestDriver();
+ }
protected void preRunChecks(Object mapper, Object reducer) {
if (inputList.isEmpty()) {
diff --git a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
index 516df7b..fce3ed4 100644
--- a/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/PipelineMapReduceDriver.java
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
@@ -61,6 +62,8 @@
private List<Pair<Mapper, Reducer>> mapReducePipeline;
private final List<Pair<K1, V1>> inputList;
private Counters counters;
+
+ protected Path mapInputPath = new Path("somefile");
public PipelineMapReduceDriver(final List<Pair<Mapper, Reducer>> pipeline) {
this();
@@ -308,6 +311,30 @@
addOutputFromString(output);
return this;
}
+
+ /**
+ * @return the path passed to the mapper InputSplit
+ */
+ public Path getMapInputPath() {
+ return mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath Path which is to be passed to the mappers InputSplit
+ */
+ public void setMapInputPath(Path mapInputPath) {
+ this.mapInputPath = mapInputPath;
+ }
+
+ /**
+ * @param mapInputPath
+ * The Path object which will be given to the mapper
+ * @return
+ */
+ public final PipelineMapReduceDriver<K1, V1, K2, V2> withMapInputPath(Path mapInputPath) {
+ setMapInputPath(mapInputPath);
+ return this;
+ }
@Override
@SuppressWarnings("unchecked")
@@ -329,6 +356,7 @@
mrDriver.setCounters(getCounters());
mrDriver.setConfiguration(configuration);
+ mrDriver.setMapInputPath(mapInputPath);
// Add the inputs from the user, or from the previous stage of the
// pipeline.
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 76ecfa2..0a456b8 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -126,8 +126,7 @@
.createOutputCollectable(getConfiguration(),
getOutputCopyingOrInputFormatConfiguration());
final MockReporter reporter = new MockReporter(
- MockReporter.ReporterType.Reducer, getCounters(),
- getMapInputPath());
+ MockReporter.ReporterType.Reducer, getCounters());
ReflectionUtils.setConf(myReducer, new JobConf(getConfiguration()));
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index 03af72a..d7a3ded 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -56,8 +56,6 @@
protected CounterWrapper counterWrapper;
protected Serialization serialization;
-
- protected Path mapInputPath = new Path("somefile");
public TestDriver() {
expectedOutputs = new ArrayList<Pair<K2, V2>>();
@@ -104,7 +102,7 @@
}
@SuppressWarnings("unchecked")
- private T thisAsTestDriver() {
+ protected T thisAsTestDriver() {
return (T) this;
}
@@ -193,30 +191,6 @@
}
/**
- * @return the path passed to the mapper InputSplit
- */
- public Path getMapInputPath() {
- return mapInputPath;
- }
-
- /**
- * @param mapInputPath Path which is to be passed to the mappers InputSplit
- */
- public void setMapInputPath(Path mapInputPath) {
- this.mapInputPath = mapInputPath;
- }
-
- /**
- * @param mapInputPath
- * The Path object which will be given to the mapper
- * @return
- */
- public final T withMapInputPath(Path mapInputPath) {
- setMapInputPath(mapInputPath);
- return thisAsTestDriver();
- }
-
- /**
* Adds a file to be put on the distributed cache.
* The path may be relative and will try to be resolved from
* the classpath of the test.
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
index 6855336..e28b48d 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockReporter.java
@@ -34,11 +34,19 @@
private final ReporterType typ;
+ public MockReporter(final ReporterType kind, final Counters ctrs) {
+ this(kind, ctrs, null);
+ }
+
public MockReporter(final ReporterType kind, final Counters ctrs,
final Path mapInputPath) {
typ = kind;
counters = ctrs;
- inputSplit = new MockInputSplit(mapInputPath);
+ if(mapInputPath == null) {
+ inputSplit = null;
+ } else {
+ inputSplit = new MockInputSplit(mapInputPath);
+ }
}
@Override
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index 497c667..8bd3734 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -18,14 +18,13 @@
package org.apache.hadoop.mrunit.mapreduce;
-import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
diff --git a/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java b/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
new file mode 100644
index 0000000..e4b50b7
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/mrunit/InputPathStoringMapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+class InputPathStoringMapper<VALUEIN, VALUEOUT> extends MapReduceBase implements
+ Mapper<Text, VALUEIN, Text, VALUEOUT> {
+ private Path mapInputPath;
+
+ @Override
+ public void map(Text key, VALUEIN value, OutputCollector<Text, VALUEOUT> output,
+ Reporter reporter) throws IOException {
+ if (reporter.getInputSplit() instanceof FileSplit) {
+ mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
+ }
+ }
+
+ Path getMapInputPath() {
+ return mapInputPath;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
index 7189d87..ed54646 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
@@ -543,26 +542,9 @@
driver.runTest();
}
- private static class InputPathStoringMapper extends MapReduceBase implements
- Mapper<Text, Text, Text, Text> {
- private Path mapInputPath;
-
- @Override
- public void map(Text key, Text value, OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException {
- if (reporter.getInputSplit() instanceof FileSplit) {
- mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
- }
- }
-
- private Path getMapInputPath() {
- return mapInputPath;
- }
- }
-
@Test
public void testMapInputFile() throws IOException {
- InputPathStoringMapper mapper = new InputPathStoringMapper();
+ InputPathStoringMapper<Text, Text> mapper = new InputPathStoringMapper<Text, Text>();
Path mapInputPath = new Path("myfile");
driver = MapDriver.newMapDriver(mapper);
driver.setMapInputPath(mapInputPath);
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
index 4912d81..47d39f5 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
@@ -566,27 +566,10 @@
driver.runTest();
}
- private static class InputPathStoringMapper extends MapReduceBase implements
- Mapper<Text, LongWritable, Text, LongWritable> {
- private Path mapInputPath;
-
- @Override
- public void map(Text key, LongWritable value,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
- if (reporter.getInputSplit() instanceof FileSplit) {
- mapInputPath = ((FileSplit) reporter.getInputSplit()).getPath();
- }
- }
-
- private Path getMapInputPath() {
- return mapInputPath;
- }
- }
-
@Test
public void testMapInputFile() throws IOException {
- InputPathStoringMapper mapper = new InputPathStoringMapper();
+ InputPathStoringMapper<LongWritable,LongWritable> mapper =
+ new InputPathStoringMapper<LongWritable,LongWritable>();
Path mapInputPath = new Path("myfile");
driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
driver.setMapInputPath(mapInputPath);
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
index af7d275..f384348 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestPipelineMapReduceDriver.java
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.mrunit;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -306,4 +309,20 @@
driver.addOutput(key, value);
driver.runTest();
}
+
+ @Test
+ public void testMapInputFile() throws IOException {
+ InputPathStoringMapper<LongWritable,LongWritable> mapper =
+ new InputPathStoringMapper<LongWritable,LongWritable>();
+ Path mapInputPath = new Path("myfile");
+ final PipelineMapReduceDriver<Text, LongWritable, Text, LongWritable> driver = PipelineMapReduceDriver
+ .newPipelineMapReduceDriver();
+ driver.addMapReduce(mapper, new IdentityReducer<Text, LongWritable>());
+ driver.setMapInputPath(mapInputPath);
+ assertEquals(mapInputPath.getName(), driver.getMapInputPath().getName());
+ driver.withInput(new Text("a"), new LongWritable(1));
+ driver.runTest();
+ assertNotNull(mapper.getMapInputPath());
+ assertEquals(mapInputPath.getName(), mapper.getMapInputPath().getName());
+ }
}