MRUNIT-101: allow use of real OutputFormats such as TextOutputFormat instead of the MockOutputCollector/MockContextWrapper classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/mrunit/trunk@1336519 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index 8e363f2..0ba6d1f 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -27,11 +27,14 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
-import org.apache.hadoop.mrunit.internal.mapred.MockOutputCollector;
import org.apache.hadoop.mrunit.internal.mapred.MockReporter;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -51,6 +54,8 @@
private Mapper<K1, V1, K2, V2> myMapper;
private Counters counters;
+ private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
public MapDriver(final Mapper<K1, V1, K2, V2> m) {
this();
setMapper(m);
@@ -212,6 +217,13 @@
return this;
}
+ public MapDriver<K1, V1, K2, V2> withOutputFormat(
+ final Class<? extends OutputFormat> outputFormatClass,
+ final Class<? extends InputFormat> inputFormatClass) {
+ mockOutputCreator.setMapredFormats(outputFormatClass, inputFormatClass);
+ return this;
+ }
+
@Override
public List<Pair<K2, V2>> run() throws IOException {
if (inputKey == null || inputVal == null) {
@@ -221,8 +233,8 @@
throw new IllegalStateException("No Mapper class was provided");
}
- final MockOutputCollector<K2, V2> outputCollector = new MockOutputCollector<K2, V2>(
- getConfiguration());
+ final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+ .createOutputCollectable(getConfiguration());
final MockReporter reporter = new MockReporter(
MockReporter.ReporterType.Mapper, getCounters());
@@ -230,9 +242,9 @@
((Configurable) myMapper).setConf(getConfiguration());
}
myMapper.configure(new JobConf(getConfiguration()));
- myMapper.map(inputKey, inputVal, outputCollector, reporter);
+ myMapper.map(inputKey, inputVal, outputCollectable, reporter);
myMapper.close();
- return outputCollector.getOutputs();
+ return outputCollectable.getOutputs();
}
@Override
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
index 838aa42..7d12dd3 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
@@ -28,7 +28,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
import org.apache.hadoop.mrunit.types.Pair;
@@ -57,6 +59,9 @@
private Reducer<K2, V2, K2, V2> myCombiner;
private Counters counters;
+ private Class<? extends OutputFormat> outputFormatClass;
+ private Class<? extends InputFormat> inputFormatClass;
+
public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
final Reducer<K2, V2, K3, V3> r) {
this();
@@ -264,6 +269,14 @@
return this;
}
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
+ final Class<? extends OutputFormat> outputFormatClass,
+ final Class<? extends InputFormat> inputFormatClass) {
+ this.outputFormatClass = returnNonNull(outputFormatClass);
+ this.inputFormatClass = returnNonNull(inputFormatClass);
+ return this;
+ }
+
/**
* The private class to manage starting the reduce phase is used for type
* genericity reasons. This class is used in the run() method.
@@ -283,11 +296,15 @@
LOG.debug("Reducing input (" + inputKey.toString() + ", "
+ sb.toString() + ")");
- reduceOutputs.addAll(ReduceDriver.newReduceDriver(reducer)
- .withCounters(getCounters()).withConfiguration(configuration)
- .withInputKey(inputKey).withInputValues(inputValues).run());
+ final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
+ .newReduceDriver(reducer).withCounters(getCounters())
+ .withConfiguration(configuration).withInputKey(inputKey)
+ .withInputValues(inputValues);
+ if (outputFormatClass != null) {
+ reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
+ }
+ reduceOutputs.addAll(reduceDriver.run());
}
-
return reduceOutputs;
}
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 8ca8827..1926cf6 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -26,11 +26,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
-import org.apache.hadoop.mrunit.internal.mapred.MockOutputCollector;
import org.apache.hadoop.mrunit.internal.mapred.MockReporter;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -52,6 +55,8 @@
private Reducer<K1, V1, K2, V2> myReducer;
private Counters counters;
+ private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
this();
setReducer(r);
@@ -222,6 +227,13 @@
return this;
}
+ public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
+ final Class<? extends OutputFormat> outputFormatClass,
+ final Class<? extends InputFormat> inputFormatClass) {
+ mockOutputCreator.setMapredFormats(outputFormatClass, inputFormatClass);
+ return this;
+ }
+
@Override
public List<Pair<K2, V2>> run() throws IOException {
if (inputKey == null || getInputValues().isEmpty()) {
@@ -231,18 +243,16 @@
throw new IllegalStateException("No Reducer class was provided");
}
- final MockOutputCollector<K2, V2> outputCollector = new MockOutputCollector<K2, V2>(
- getConfiguration());
+ final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+ .createOutputCollectable(getConfiguration());
final MockReporter reporter = new MockReporter(
MockReporter.ReporterType.Reducer, getCounters());
myReducer.configure(new JobConf(getConfiguration()));
- myReducer.reduce(inputKey, getInputValues().iterator(), outputCollector,
+ myReducer.reduce(inputKey, getInputValues().iterator(), outputCollectable,
reporter);
myReducer.close();
-
- final List<Pair<K2, V2>> outputs = outputCollector.getOutputs();
- return outputs;
+ return outputCollectable.getOutputs();
}
@Override
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java
index 2e728da..9dd083e 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java
@@ -30,7 +30,7 @@
* class.
*/
@SuppressWarnings("deprecation")
-public class MockInputSplit extends FileSplit implements InputSplit {
+class MockInputSplit extends FileSplit implements InputSplit {
private static final Path MOCK_PATH = new Path("somefile");
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
new file mode 100644
index 0000000..54b3f63
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MockMapredOutputFormat<K, V> implements OutputCollectable<K, V> {
+
+ private static final String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
+ private static final TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
+
+ private final JobConf conf;
+ private final File outputPath = new File(
+ System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
+ private final File outputFile = new File(outputPath, "part-00000");
+ private RecordWriter recordWriter;
+ private final InputFormat inputFormat;
+ private final OutputFormat outputFormat;
+ private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
+
+ public MockMapredOutputFormat(JobConf conf,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class<? extends InputFormat> inputFormatClass) throws IOException {
+ this.conf = conf;
+
+ outputFormat = ReflectionUtils.newInstance(outputFormatClass, conf);
+ inputFormat = ReflectionUtils.newInstance(inputFormatClass, conf);
+
+ if (outputPath.exists()) {
+ throw new IllegalStateException(
+ "Generated the same random dir name twice: " + outputPath);
+ }
+ if (!outputPath.mkdir()) {
+ throw new IOException("Failed to create output dir " + outputPath);
+ }
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath.toString()));
+ conf.set("mapred.task.id", TASK_ID.toString());
+ FileSystem.getLocal(conf).mkdirs(
+ new Path(outputPath.toString(), FileOutputCommitter.TEMP_DIR_NAME));
+ }
+
+ private void setClassIfUnset(String name, Class<?> classType) {
+ conf.setIfUnset(name, classType.getName());
+ }
+
+ @Override
+ public void collect(K key, V value) throws IOException {
+ // only set if classes are unset to allow setting higher level class when
+ // using multiple subtypes
+ if (recordWriter == null) {
+ setClassIfUnset("mapred.output.key.class", key.getClass());
+ setClassIfUnset("mapred.output.value.class", value.getClass());
+
+ recordWriter = outputFormat.getRecordWriter(FileSystem.getLocal(conf),
+ conf, outputFile.getName(), Reporter.NULL);
+ }
+
+ recordWriter.write(key, value);
+ }
+
+ @Override
+ public List<Pair<K, V>> getOutputs() throws IOException {
+ recordWriter.close(Reporter.NULL);
+
+ FileInputFormat.setInputPaths(conf, outputPath + "/*/*/*/*");
+ for (InputSplit inputSplit : inputFormat.getSplits(conf, 1)) {
+ final RecordReader<K, V> recordReader = inputFormat.getRecordReader(
+ inputSplit, conf, Reporter.NULL);
+ K key = recordReader.createKey();
+ V value = recordReader.createValue();
+ while (recordReader.next(key, value)) {
+ outputs.add(new Pair<K, V>(key, value));
+ key = recordReader.createKey();
+ value = recordReader.createValue();
+ }
+ }
+ FileUtil.fullyDelete(outputPath);
+ return outputs;
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
similarity index 75%
rename from src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockContextWrapper.java
rename to src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
index 9e811b9..07981d1 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.mrunit.internal.mapreduce;
import static org.mockito.Matchers.any;
@@ -24,28 +23,31 @@
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mrunit.internal.io.Serialization;
-import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-public abstract class MockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+abstract class AbstractMockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT, CONTEXT extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
+
protected final Counters counters;
protected final Configuration conf;
- protected final Serialization serialization;
- protected final List<Pair<KEYOUT, VALUEOUT>> outputs = new ArrayList<Pair<KEYOUT, VALUEOUT>>();
- public MockContextWrapper(final Counters counters, final Configuration conf) {
+ protected final CONTEXT context;
+ private final OutputCollectable<KEYOUT, VALUEOUT> outputCollectable;
+
+ public AbstractMockContextWrapper(final Counters counters,
+ final Configuration conf,
+ final OutputCollectable<KEYOUT, VALUEOUT> outputCollectable)
+ throws IOException, InterruptedException {
this.conf = conf;
- serialization = new Serialization(conf);
this.counters = counters;
+ this.outputCollectable = outputCollectable;
+ context = create();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -77,11 +79,19 @@
@Override
public Object answer(final InvocationOnMock invocation) {
final Object[] args = invocation.getArguments();
- outputs.add(new Pair(serialization.copy(args[0]), serialization
- .copy(args[1])));
+ try {
+ outputCollectable.collect((KEYOUT) args[0], (VALUEOUT) args[1]);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
return null;
}
}).when(context).write((KEYOUT) any(), (VALUEOUT) any());
+ }
+ protected abstract CONTEXT create() throws IOException, InterruptedException;
+
+ public CONTEXT getMockContext() {
+ return context;
}
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java
index ea8b4ce..9e6482a 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java
@@ -24,7 +24,7 @@
/**
* Mock implementation of InputSplit that does nothing.
*/
-public class MockInputSplit extends FileSplit {
+class MockInputSplit extends FileSplit {
private static final Path MOCK_PATH = new Path("somefile");
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
index b285f9f..2473784 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -43,27 +44,27 @@
*
* This wrapper class exists for that purpose.
*/
-public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
- MockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends
+ AbstractMockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context> {
protected static final Log LOG = LogFactory
.getLog(MockMapContextWrapper.class);
+
protected final List<Pair<KEYIN, VALUEIN>> inputs;
protected Pair<KEYIN, VALUEIN> currentKeyValue;
- protected final Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context;
- protected InputSplit inputSplit;
-
+ protected InputSplit inputSplit = new MockInputSplit();
+
public MockMapContextWrapper(final List<Pair<KEYIN, VALUEIN>> inputs,
- final Counters counters, final Configuration conf, final InputSplit inputSplit)
+ final Counters counters, final Configuration conf,
+ final OutputCollectable<KEYOUT, VALUEOUT> outputCollectable)
throws IOException, InterruptedException {
- super(counters, conf);
+ super(counters, conf, outputCollectable);
this.inputs = inputs;
- this.inputSplit = inputSplit;
- context = create();
}
@SuppressWarnings({ "unchecked" })
- private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
+ protected Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
throws IOException, InterruptedException {
final Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context = mock(org.apache.hadoop.mapreduce.Mapper.Context.class);
@@ -105,15 +106,4 @@
});
return context;
}
-
- /**
- * @return the outputs from the MockOutputCollector back to the test harness.
- */
- public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
- return outputs;
- }
-
- public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMockContext() {
- return context;
- }
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
new file mode 100644
index 0000000..24a7fad
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mrunit.internal.io.Serialization;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V> {
+
+ private static String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
+ private static TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
+ private static final Class<?>[] TASK_ATTEMPT_CONTEXT_CLASSES = new Class<?>[] {
+ Configuration.class, TaskAttemptID.class };
+ private static final Class<?>[] JOB_CONTEXT_CLASSES = new Class<?>[] {
+ Configuration.class, JobID.class };
+
+ private final Job job;
+ private final File outputPath = new File(
+ System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
+ private TaskAttemptContext taskAttemptContext;
+ private RecordWriter recordWriter;
+ private final InputFormat inputFormat;
+ private final OutputFormat outputFormat;
+ private final Serialization serialization;
+ private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
+
+ public MockMapreduceOutputFormat(Job job,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class<? extends InputFormat> inputFormatClass) throws IOException {
+ this.job = job;
+
+ outputFormat = ReflectionUtils.newInstance(outputFormatClass,
+ job.getConfiguration());
+ inputFormat = ReflectionUtils.newInstance(inputFormatClass,
+ job.getConfiguration());
+ serialization = new Serialization((job.getConfiguration()));
+
+ if (outputPath.exists()) {
+ throw new IllegalStateException(
+ "Generated the same random dir name twice: " + outputPath);
+ }
+ if (!outputPath.mkdir()) {
+ throw new IOException("Failed to create output dir " + outputPath);
+ }
+ FileOutputFormat.setOutputPath(job, new Path(outputPath.toString()));
+ }
+
+ private void setClassIfUnset(String name, Class<?> classType) {
+ job.getConfiguration().setIfUnset(name, classType.getName());
+ }
+
+ private Object createObject(String primaryClassName,
+ String secondaryClassName, Class<?>[] constructorParametersClasses,
+ Object... constructorParameters) {
+ try {
+ Class<?> classType = Class.forName(primaryClassName);
+ try {
+ Constructor<?> constructor = classType
+ .getConstructor(constructorParametersClasses);
+ return constructor.newInstance(constructorParameters);
+ } catch (SecurityException e) {
+ throw new IllegalStateException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalStateException(e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalStateException(e);
+ }
+ } catch (ClassNotFoundException e) {
+ if (secondaryClassName == null) {
+ throw new IllegalStateException(e);
+ }
+ return createObject(secondaryClassName, null,
+ constructorParametersClasses, constructorParameters);
+ }
+ }
+
+ @Override
+ public void collect(K key, V value) throws IOException {
+ try {
+ if (recordWriter == null) {
+ setClassIfUnset("mapred.output.key.class", key.getClass());
+ setClassIfUnset("mapred.output.value.class", value.getClass());
+
+ taskAttemptContext = (TaskAttemptContext) createObject(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
+ "org.apache.hadoop.mapreduce.TaskAttemptContext",
+ TASK_ATTEMPT_CONTEXT_CLASSES, job.getConfiguration(), TASK_ID);
+ recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+ }
+
+ recordWriter.write(key, value);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public List<Pair<K, V>> getOutputs() throws IOException {
+ try {
+ recordWriter.close(taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ FileInputFormat.setInputPaths(job, outputPath + "/*/*/*/*");
+ try {
+ List<InputSplit> inputSplits = inputFormat
+ .getSplits((JobContext) createObject(
+ "org.apache.hadoop.mapreduce.task.JobContextImpl",
+ "org.apache.hadoop.mapreduce.JobContext", JOB_CONTEXT_CLASSES,
+ job.getConfiguration(), new JobID()));
+ for (InputSplit inputSplit : inputSplits) {
+ RecordReader<K, V> recordReader = inputFormat.createRecordReader(
+ inputSplit, taskAttemptContext);
+ recordReader.initialize(inputSplit, taskAttemptContext);
+ while (recordReader.nextKeyValue()) {
+ outputs.add(new Pair<K, V>(serialization.copy(recordReader
+ .getCurrentKey()), serialization.copy(recordReader
+ .getCurrentValue())));
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ FileUtil.fullyDelete(outputPath);
+ return outputs;
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
index 0654f3d..46d7d26 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -43,25 +44,26 @@
*
* This wrapper class exists for that purpose.
*/
-public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
- MockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+ extends
+ AbstractMockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context> {
protected static final Log LOG = LogFactory
.getLog(MockReduceContextWrapper.class);
protected final List<Pair<KEYIN, List<VALUEIN>>> inputs;
protected Pair<KEYIN, List<VALUEIN>> currentKeyValue;
- protected final Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context;
public MockReduceContextWrapper(
final List<Pair<KEYIN, List<VALUEIN>>> inputs, final Counters counters,
- final Configuration conf) throws IOException, InterruptedException {
- super(counters, conf);
+ final Configuration conf,
+ OutputCollectable<KEYOUT, VALUEOUT> outputCollectable)
+ throws IOException, InterruptedException {
+ super(counters, conf, outputCollectable);
this.inputs = inputs;
- context = create();
}
@SuppressWarnings({ "unchecked" })
- private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
+ protected Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
throws IOException, InterruptedException {
final Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context = mock(org.apache.hadoop.mapreduce.Reducer.Context.class);
@@ -97,13 +99,6 @@
return context;
}
- /**
- * @return the outputs from the MockOutputCollector back to the test harness.
- */
- public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
- return outputs;
- }
-
protected static <V> Iterable<V> makeOneUseIterator(final Iterator<V> parent) {
return new Iterable<V>() {
private final Iterator<V> iter = new Iterator<V>() {
@@ -137,8 +132,4 @@
}
};
}
-
- public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMockContext() {
- return context;
- }
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockOutputCollector.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCollector.java
similarity index 89%
rename from src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockOutputCollector.java
rename to src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCollector.java
index acc01cf..de84935 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockOutputCollector.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCollector.java
@@ -15,14 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.mrunit.internal.mapred;
+package org.apache.hadoop.mrunit.internal.output;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mrunit.internal.io.Serialization;
import org.apache.hadoop.mrunit.types.Pair;
@@ -31,9 +30,9 @@
* Accepts a set of output (k, v) pairs and returns them to the framework for
* validation.
*/
-public class MockOutputCollector<K, V> implements OutputCollector<K, V> {
+public class MockOutputCollector<K, V> implements OutputCollectable<K, V> {
- private final ArrayList<Pair<K, V>> collectedOutputs;
+ private final List<Pair<K, V>> collectedOutputs;
private final Serialization serialization;
public MockOutputCollector(final Configuration conf) {
@@ -54,6 +53,7 @@
/**
* @return The outputs generated by the mapper/reducer being tested
*/
+ @Override
public List<Pair<K, V>> getOutputs() {
return collectedOutputs;
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
new file mode 100644
index 0000000..8ea2497
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.output;
+
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mrunit.internal.mapred.MockMapredOutputFormat;
+import org.apache.hadoop.mrunit.internal.mapreduce.MockMapreduceOutputFormat;
+
+public class MockOutputCreator<K, V> {
+
+ private Class<? extends OutputFormat> mapredOutputFormatClass;
+ private Class<? extends InputFormat> mapredInputFormatClass;
+
+ private Class<? extends org.apache.hadoop.mapreduce.OutputFormat> mapreduceOutputFormatClass;
+ private Class<? extends org.apache.hadoop.mapreduce.InputFormat> mapreduceInputFormatClass;
+
+ public void setMapredFormats(Class<? extends OutputFormat> outputFormatClass,
+ Class<? extends InputFormat> inputFormatClass) {
+ mapredOutputFormatClass = returnNonNull(outputFormatClass);
+ mapredInputFormatClass = returnNonNull(inputFormatClass);
+ }
+
+ public void setMapreduceFormats(
+ Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass,
+ Class<? extends org.apache.hadoop.mapreduce.InputFormat> inputFormatClass) {
+ mapreduceOutputFormatClass = returnNonNull(outputFormatClass);
+ mapreduceInputFormatClass = returnNonNull(inputFormatClass);
+ }
+
+ public OutputCollectable<K, V> createOutputCollectable(Configuration conf)
+ throws IOException {
+ if (mapredOutputFormatClass != null) {
+ return new MockMapredOutputFormat<K, V>(new JobConf(conf),
+ mapredOutputFormatClass, mapredInputFormatClass);
+ }
+ if (mapreduceOutputFormatClass != null) {
+ return new MockMapreduceOutputFormat<K, V>(new Job(conf),
+ mapreduceOutputFormatClass, mapreduceInputFormatClass);
+ }
+ return new MockOutputCollector<K, V>(conf);
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/OutputCollectable.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/OutputCollectable.java
new file mode 100644
index 0000000..24bb597
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/OutputCollectable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mrunit.types.Pair;
+
+public interface OutputCollectable<K, V> extends OutputCollector<K, V> {
+
+ List<Pair<K, V>> getOutputs() throws IOException;
+
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
index 7c7a658..1e690ba 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
@@ -28,12 +28,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mrunit.MapDriverBase;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
-import org.apache.hadoop.mrunit.internal.mapreduce.MockInputSplit;
import org.apache.hadoop.mrunit.internal.mapreduce.MockMapContextWrapper;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -52,6 +54,8 @@
private Mapper<K1, V1, K2, V2> myMapper;
private Counters counters;
+ private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
public MapDriver(final Mapper<K1, V1, K2, V2> m) {
this();
setMapper(m);
@@ -199,6 +203,13 @@
return this;
}
+ public MapDriver<K1, V1, K2, V2> withOutputFormat(
+ final Class<? extends OutputFormat> outputFormatClass,
+ final Class<? extends InputFormat> inputFormatClass) {
+ mockOutputCreator.setMapreduceFormats(outputFormatClass, inputFormatClass);
+ return this;
+ }
+
@Override
public List<Pair<K2, V2>> run() throws IOException {
if (inputKey == null || inputVal == null) {
@@ -211,15 +222,13 @@
final List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
inputs.add(new Pair<K1, V1>(inputKey, inputVal));
- final InputSplit inputSplit = new MockInputSplit();
-
try {
+ final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+ .createOutputCollectable(getConfiguration());
final MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(
- inputs, getCounters(), getConfiguration(), inputSplit);
-
- final Mapper<K1, V1, K2, V2>.Context context = wrapper.getMockContext();
- myMapper.run(context);
- return wrapper.getOutputs();
+ inputs, getCounters(), getConfiguration(), outputCollectable);
+ myMapper.run(wrapper.getMockContext());
+ return outputCollectable.getOutputs();
} catch (final InterruptedException ie) {
throw new IOException(ie);
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index ffc024f..eaa0e76 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -28,7 +28,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.MapReduceDriverBase;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
@@ -55,6 +57,9 @@
private Reducer<K2, V2, K2, V2> myCombiner;
private Counters counters;
+ private Class<? extends OutputFormat> outputFormatClass;
+ private Class<? extends InputFormat> inputFormatClass;
+
public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
final Reducer<K2, V2, K3, V3> r) {
this();
@@ -262,6 +267,14 @@
return this;
}
+ public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
+ final Class<? extends OutputFormat> outputFormatClass,
+ final Class<? extends InputFormat> inputFormatClass) {
+ this.outputFormatClass = returnNonNull(outputFormatClass);
+ this.inputFormatClass = returnNonNull(inputFormatClass);
+ return this;
+ }
+
/**
* The private class to manage starting the reduce phase is used for type
* genericity reasons. This class is used in the run() method.
@@ -281,9 +294,14 @@
LOG.debug("Reducing input (" + inputKey.toString() + ", "
+ sb.toString() + ")");
- reduceOutputs.addAll(ReduceDriver.newReduceDriver(reducer)
- .withCounters(getCounters()).withConfiguration(configuration)
- .withInputKey(inputKey).withInputValues(inputValues).run());
+ final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
+ .newReduceDriver(reducer).withCounters(getCounters())
+ .withConfiguration(configuration).withInputKey(inputKey)
+ .withInputValues(inputValues);
+ if (outputFormatClass != null) {
+ reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
+ }
+ reduceOutputs.addAll(reduceDriver.run());
}
return reduceOutputs;
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index e365ee4..4649d15 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -28,10 +28,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.ReduceDriverBase;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
import org.apache.hadoop.mrunit.internal.mapreduce.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -52,6 +56,8 @@
private Reducer<K1, V1, K2, V2> myReducer;
private Counters counters;
+ private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
this();
setReducer(r);
@@ -211,6 +217,13 @@
return this;
}
+ public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
+ final Class<? extends OutputFormat> outputFormatClass,
+ final Class<? extends InputFormat> inputFormatClass) {
+ mockOutputCreator.setMapreduceFormats(outputFormatClass, inputFormatClass);
+ return this;
+ }
+
@Override
public List<Pair<K2, V2>> run() throws IOException {
if (inputKey == null || getInputValues().isEmpty()) {
@@ -224,11 +237,12 @@
inputs.add(new Pair<K1, List<V1>>(inputKey, getInputValues()));
try {
+ final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+ .createOutputCollectable(getConfiguration());
final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
- inputs, getCounters(), getConfiguration());
- final Reducer<K1, V1, K2, V2>.Context context = wrapper.getMockContext();
- myReducer.run(context);
- return wrapper.getOutputs();
+ inputs, getCounters(), getConfiguration(), outputCollectable);
+ myReducer.run(wrapper.getMockContext());
+ return outputCollectable.getOutputs();
} catch (final InterruptedException ie) {
throw new IOException(ie);
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
index e9574bc..c061f58 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
@@ -33,6 +33,10 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
@@ -426,4 +430,23 @@
driver.withOutput(new Text("a"), output);
driver.runTest();
}
+
+ @Test
+ public void testOutputFormat() {
+ driver.withOutputFormat(SequenceFileOutputFormat.class,
+ SequenceFileInputFormat.class);
+ driver.withInput(new Text("a"), new Text("1"));
+ driver.withOutput(new Text("a"), new Text("1"));
+ driver.runTest();
+ }
+
+ @Test
+ public void testOutputFormatWithMismatchInOutputClasses() {
+ final MapDriver<Text, Text, LongWritable, Text> driver = MapDriver
+ .newMapDriver(new IdentityMapper());
+ driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+ driver.withInput(new Text("a"), new Text("1"));
+ driver.withOutput(new LongWritable(), new Text("a\t1"));
+ driver.runTest();
+ }
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
index e341b60..2540cf9 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
@@ -35,6 +35,10 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.LongSumReducer;
@@ -441,4 +445,24 @@
driver.addOutput(key, value);
driver.runTest();
}
+
+ @Test
+ public void testOutputFormat() {
+ driver.withOutputFormat(SequenceFileOutputFormat.class,
+ SequenceFileInputFormat.class);
+ driver.withInput(new Text("a"), new LongWritable(1));
+ driver.withInput(new Text("a"), new LongWritable(2));
+ driver.withOutput(new Text("a"), new LongWritable(3));
+ driver.runTest();
+ }
+
+ @Test
+ public void testOutputFormatWithMismatchInOutputClasses() {
+ final MapReduceDriver driver = this.driver;
+ driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+ driver.withInput(new Text("a"), new LongWritable(1));
+ driver.withInput(new Text("a"), new LongWritable(2));
+ driver.withOutput(new LongWritable(), new Text("a\t3"));
+ driver.runTest();
+ }
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java
index 0abbff4..fba4770 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java
@@ -33,6 +33,11 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mrunit.types.Pair;
@@ -361,6 +366,28 @@
driver.runTest();
}
+ @Test
+ public void testOutputFormat() {
+ driver.withOutputFormat(SequenceFileOutputFormat.class,
+ SequenceFileInputFormat.class);
+ driver.withInputKey(new Text("a"));
+ driver.withInputValue(new LongWritable(1)).withInputValue(
+ new LongWritable(2));
+ driver.withOutput(new Text("a"), new LongWritable(3));
+ driver.runTest();
+ }
+
+ @Test
+ public void testOutputFormatWithMismatchInOutputClasses() {
+ final ReduceDriver driver = ReduceDriver.newReduceDriver(reducer);
+ driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+ driver.withInputKey(new Text("a"));
+ driver.withInputValue(new LongWritable(1)).withInputValue(
+ new LongWritable(2));
+ driver.withOutput(new LongWritable(), new Text("a\t3"));
+ driver.runTest();
+ }
+
/**
* Simple reducer that have custom counters that are increased each map() call
*/
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
index 7eaff18..10213b2 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
@@ -31,7 +31,11 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mrunit.ExpectedSuppliedException;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
@@ -299,4 +303,23 @@
driver.addOutput(new IntWritable(2), 1);
driver.runTest();
}
+
+ @Test
+ public void testOutputFormat() {
+ driver.withOutputFormat(SequenceFileOutputFormat.class,
+ SequenceFileInputFormat.class);
+ driver.withInput(new Text("a"), new Text("1"));
+ driver.withOutput(new Text("a"), new Text("1"));
+ driver.runTest();
+ }
+
+ @Test
+ public void testOutputFormatWithMismatchInOutputClasses() {
+ final MapDriver<Text, Text, LongWritable, Text> driver = MapDriver
+ .newMapDriver(new Mapper());
+ driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+ driver.withInput(new Text("a"), new Text("1"));
+ driver.withOutput(new LongWritable(), new Text("a\t1"));
+ driver.runTest();
+ }
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
index 05ead70..8d1be2f 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
@@ -32,7 +32,11 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.mrunit.ExpectedSuppliedException;
@@ -409,4 +413,24 @@
driver.withOutput(2, new IntWritable(1)).withOutput(3, new IntWritable(2))
.runTest();
}
+
+ @Test
+ public void testOutputFormat() {
+ driver.withOutputFormat(SequenceFileOutputFormat.class,
+ SequenceFileInputFormat.class);
+ driver.withInput(new Text("a"), new LongWritable(1));
+ driver.withInput(new Text("a"), new LongWritable(2));
+ driver.withOutput(new Text("a"), new LongWritable(3));
+ driver.runTest();
+ }
+
+ @Test
+ public void testOutputFormatWithMismatchInOutputClasses() {
+ final MapReduceDriver driver = this.driver;
+ driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+ driver.withInput(new Text("a"), new LongWritable(1));
+ driver.withInput(new Text("a"), new LongWritable(2));
+ driver.withOutput(new LongWritable(), new Text("a\t3"));
+ driver.runTest();
+ }
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
index 38e506e..cbb0e78 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
@@ -32,7 +32,12 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.mrunit.ExpectedSuppliedException;
@@ -371,4 +376,26 @@
driver.withInputKey(1).withInputValue(new IntWritable(2))
.withOutput(1, new IntWritable(2)).runTest();
}
+
+ @Test
+ public void testOutputFormat() {
+ driver.withOutputFormat(SequenceFileOutputFormat.class,
+ SequenceFileInputFormat.class);
+ driver.withInputKey(new Text("a"));
+ driver.withInputValue(new LongWritable(1)).withInputValue(
+ new LongWritable(2));
+ driver.withOutput(new Text("a"), new LongWritable(3));
+ driver.runTest();
+ }
+
+ @Test
+ public void testOutputFormatWithMismatchInOutputClasses() {
+ final ReduceDriver driver = ReduceDriver.newReduceDriver(reducer);
+ driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+ driver.withInputKey(new Text("a"));
+ driver.withInputValue(new LongWritable(1)).withInputValue(
+ new LongWritable(2));
+ driver.withOutput(new LongWritable(), new Text("a\t3"));
+ driver.runTest();
+ }
}