MRUNIT-101: allow use of real OutputFormats such as TextOutputFormat instead of the MockOutputCollector/MockContextWrapper classes

git-svn-id: https://svn.apache.org/repos/asf/incubator/mrunit/trunk@1336519 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
index 8e363f2..0ba6d1f 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapDriver.java
@@ -27,11 +27,14 @@
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
-import org.apache.hadoop.mrunit.internal.mapred.MockOutputCollector;
 import org.apache.hadoop.mrunit.internal.mapred.MockReporter;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -51,6 +54,8 @@
   private Mapper<K1, V1, K2, V2> myMapper;
   private Counters counters;
 
+  private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
   public MapDriver(final Mapper<K1, V1, K2, V2> m) {
     this();
     setMapper(m);
@@ -212,6 +217,13 @@
     return this;
   }
 
+  public MapDriver<K1, V1, K2, V2> withOutputFormat(
+      final Class<? extends OutputFormat> outputFormatClass,
+      final Class<? extends InputFormat> inputFormatClass) {
+    mockOutputCreator.setMapredFormats(outputFormatClass, inputFormatClass);
+    return this;
+  }
+
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
     if (inputKey == null || inputVal == null) {
@@ -221,8 +233,8 @@
       throw new IllegalStateException("No Mapper class was provided");
     }
 
-    final MockOutputCollector<K2, V2> outputCollector = new MockOutputCollector<K2, V2>(
-        getConfiguration());
+    final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+        .createOutputCollectable(getConfiguration());
     final MockReporter reporter = new MockReporter(
         MockReporter.ReporterType.Mapper, getCounters());
 
@@ -230,9 +242,9 @@
       ((Configurable) myMapper).setConf(getConfiguration());
     }
     myMapper.configure(new JobConf(getConfiguration()));
-    myMapper.map(inputKey, inputVal, outputCollector, reporter);
+    myMapper.map(inputKey, inputVal, outputCollectable, reporter);
     myMapper.close();
-    return outputCollector.getOutputs();
+    return outputCollectable.getOutputs();
   }
 
   @Override
diff --git a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
index 838aa42..7d12dd3 100644
--- a/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/MapReduceDriver.java
@@ -28,7 +28,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -57,6 +59,9 @@
   private Reducer<K2, V2, K2, V2> myCombiner;
   private Counters counters;
 
+  private Class<? extends OutputFormat> outputFormatClass;
+  private Class<? extends InputFormat> inputFormatClass;
+
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
       final Reducer<K2, V2, K3, V3> r) {
     this();
@@ -264,6 +269,14 @@
     return this;
   }
 
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
+      final Class<? extends OutputFormat> outputFormatClass,
+      final Class<? extends InputFormat> inputFormatClass) {
+    this.outputFormatClass = returnNonNull(outputFormatClass);
+    this.inputFormatClass = returnNonNull(inputFormatClass);
+    return this;
+  }
+
   /**
    * The private class to manage starting the reduce phase is used for type
    * genericity reasons. This class is used in the run() method.
@@ -283,11 +296,15 @@
         LOG.debug("Reducing input (" + inputKey.toString() + ", "
             + sb.toString() + ")");
 
-        reduceOutputs.addAll(ReduceDriver.newReduceDriver(reducer)
-            .withCounters(getCounters()).withConfiguration(configuration)
-            .withInputKey(inputKey).withInputValues(inputValues).run());
+        final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
+            .newReduceDriver(reducer).withCounters(getCounters())
+            .withConfiguration(configuration).withInputKey(inputKey)
+            .withInputValues(inputValues);
+        if (outputFormatClass != null) {
+          reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
+        }
+        reduceOutputs.addAll(reduceDriver.run());
       }
-
       return reduceOutputs;
     }
   }
diff --git a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
index 8ca8827..1926cf6 100644
--- a/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/ReduceDriver.java
@@ -26,11 +26,14 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
-import org.apache.hadoop.mrunit.internal.mapred.MockOutputCollector;
 import org.apache.hadoop.mrunit.internal.mapred.MockReporter;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -52,6 +55,8 @@
   private Reducer<K1, V1, K2, V2> myReducer;
   private Counters counters;
 
+  private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
     this();
     setReducer(r);
@@ -222,6 +227,13 @@
     return this;
   }
 
+  public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
+      final Class<? extends OutputFormat> outputFormatClass,
+      final Class<? extends InputFormat> inputFormatClass) {
+    mockOutputCreator.setMapredFormats(outputFormatClass, inputFormatClass);
+    return this;
+  }
+
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
     if (inputKey == null || getInputValues().isEmpty()) {
@@ -231,18 +243,16 @@
       throw new IllegalStateException("No Reducer class was provided");
     }
 
-    final MockOutputCollector<K2, V2> outputCollector = new MockOutputCollector<K2, V2>(
-        getConfiguration());
+    final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+        .createOutputCollectable(getConfiguration());
     final MockReporter reporter = new MockReporter(
         MockReporter.ReporterType.Reducer, getCounters());
 
     myReducer.configure(new JobConf(getConfiguration()));
-    myReducer.reduce(inputKey, getInputValues().iterator(), outputCollector,
+    myReducer.reduce(inputKey, getInputValues().iterator(), outputCollectable,
         reporter);
     myReducer.close();
-
-    final List<Pair<K2, V2>> outputs = outputCollector.getOutputs();
-    return outputs;
+    return outputCollectable.getOutputs();
   }
 
   @Override
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java
index 2e728da..9dd083e 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockInputSplit.java
@@ -30,7 +30,7 @@
  * class.
  */
 @SuppressWarnings("deprecation")
-public class MockInputSplit extends FileSplit implements InputSplit {
+class MockInputSplit extends FileSplit implements InputSplit {
 
   private static final Path MOCK_PATH = new Path("somefile");
 
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
new file mode 100644
index 0000000..54b3f63
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockMapredOutputFormat.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MockMapredOutputFormat<K, V> implements OutputCollectable<K, V> {
+
+  private static final String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
+  private static final TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
+
+  private final JobConf conf;
+  private final File outputPath = new File(
+      System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
+  private final File outputFile = new File(outputPath, "part-00000");
+  private RecordWriter recordWriter;
+  private final InputFormat inputFormat;
+  private final OutputFormat outputFormat;
+  private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
+
+  public MockMapredOutputFormat(JobConf conf,
+      Class<? extends OutputFormat> outputFormatClass,
+      Class<? extends InputFormat> inputFormatClass) throws IOException {
+    this.conf = conf;
+
+    outputFormat = ReflectionUtils.newInstance(outputFormatClass, conf);
+    inputFormat = ReflectionUtils.newInstance(inputFormatClass, conf);
+
+    if (outputPath.exists()) {
+      throw new IllegalStateException(
+          "Generated the same random dir name twice: " + outputPath);
+    }
+    if (!outputPath.mkdir()) {
+      throw new IOException("Failed to create output dir " + outputPath);
+    }
+    FileOutputFormat.setOutputPath(conf, new Path(outputPath.toString()));
+    conf.set("mapred.task.id", TASK_ID.toString());
+    FileSystem.getLocal(conf).mkdirs(
+        new Path(outputPath.toString(), FileOutputCommitter.TEMP_DIR_NAME));
+  }
+
+  private void setClassIfUnset(String name, Class<?> classType) {
+    conf.setIfUnset(name, classType.getName());
+  }
+
+  @Override
+  public void collect(K key, V value) throws IOException {
+    // only set if classes are unset to allow setting higher level class when
+    // using multiple subtypes
+    if (recordWriter == null) {
+      setClassIfUnset("mapred.output.key.class", key.getClass());
+      setClassIfUnset("mapred.output.value.class", value.getClass());
+
+      recordWriter = outputFormat.getRecordWriter(FileSystem.getLocal(conf),
+          conf, outputFile.getName(), Reporter.NULL);
+    }
+
+    recordWriter.write(key, value);
+  }
+
+  @Override
+  public List<Pair<K, V>> getOutputs() throws IOException {
+    recordWriter.close(Reporter.NULL);
+
+    FileInputFormat.setInputPaths(conf, outputPath + "/*/*/*/*");
+    for (InputSplit inputSplit : inputFormat.getSplits(conf, 1)) {
+      final RecordReader<K, V> recordReader = inputFormat.getRecordReader(
+          inputSplit, conf, Reporter.NULL);
+      K key = recordReader.createKey();
+      V value = recordReader.createValue();
+      while (recordReader.next(key, value)) {
+        outputs.add(new Pair<K, V>(key, value));
+        key = recordReader.createKey();
+        value = recordReader.createValue();
+      }
+    }
+    FileUtil.fullyDelete(outputPath);
+    return outputs;
+  }
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
similarity index 75%
rename from src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockContextWrapper.java
rename to src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
index 9e811b9..07981d1 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/AbstractMockContextWrapper.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.mrunit.internal.mapreduce;
 
 import static org.mockito.Matchers.any;
@@ -24,28 +23,31 @@
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mrunit.internal.io.Serialization;
-import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-public abstract class MockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+abstract class AbstractMockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT, CONTEXT extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>> {
+
   protected final Counters counters;
   protected final Configuration conf;
-  protected final Serialization serialization;
-  protected final List<Pair<KEYOUT, VALUEOUT>> outputs = new ArrayList<Pair<KEYOUT, VALUEOUT>>();
 
-  public MockContextWrapper(final Counters counters, final Configuration conf) {
+  protected final CONTEXT context;
+  private final OutputCollectable<KEYOUT, VALUEOUT> outputCollectable;
+
+  public AbstractMockContextWrapper(final Counters counters,
+      final Configuration conf,
+      final OutputCollectable<KEYOUT, VALUEOUT> outputCollectable)
+      throws IOException, InterruptedException {
     this.conf = conf;
-    serialization = new Serialization(conf);
     this.counters = counters;
+    this.outputCollectable = outputCollectable;
+    context = create();
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -77,11 +79,19 @@
       @Override
       public Object answer(final InvocationOnMock invocation) {
         final Object[] args = invocation.getArguments();
-        outputs.add(new Pair(serialization.copy(args[0]), serialization
-            .copy(args[1])));
+        try {
+          outputCollectable.collect((KEYOUT) args[0], (VALUEOUT) args[1]);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
         return null;
       }
     }).when(context).write((KEYOUT) any(), (VALUEOUT) any());
+  }
 
+  protected abstract CONTEXT create() throws IOException, InterruptedException;
+
+  public CONTEXT getMockContext() {
+    return context;
   }
 }
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java
index ea8b4ce..9e6482a 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockInputSplit.java
@@ -24,7 +24,7 @@
 /**
  * Mock implementation of InputSplit that does nothing.
  */
-public class MockInputSplit extends FileSplit {
+class MockInputSplit extends FileSplit {
 
   private static final Path MOCK_PATH = new Path("somefile");
 
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
index b285f9f..2473784 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapContextWrapper.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -43,27 +44,27 @@
  * 
  * This wrapper class exists for that purpose.
  */
-public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
-    MockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class MockMapContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+    extends
+    AbstractMockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context> {
 
   protected static final Log LOG = LogFactory
       .getLog(MockMapContextWrapper.class);
+
   protected final List<Pair<KEYIN, VALUEIN>> inputs;
   protected Pair<KEYIN, VALUEIN> currentKeyValue;
-  protected final Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context;
-  protected InputSplit inputSplit;
-  
+  protected InputSplit inputSplit = new MockInputSplit();
+
   public MockMapContextWrapper(final List<Pair<KEYIN, VALUEIN>> inputs,
-      final Counters counters, final Configuration conf, final InputSplit inputSplit) 
+      final Counters counters, final Configuration conf,
+      final OutputCollectable<KEYOUT, VALUEOUT> outputCollectable)
       throws IOException, InterruptedException {
-    super(counters, conf);
+    super(counters, conf, outputCollectable);
     this.inputs = inputs;
-    this.inputSplit = inputSplit;
-    context = create();
   }
 
   @SuppressWarnings({ "unchecked" })
-  private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
+  protected Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
       throws IOException, InterruptedException {
     final Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context = mock(org.apache.hadoop.mapreduce.Mapper.Context.class);
 
@@ -105,15 +106,4 @@
     });
     return context;
   }
-
-  /**
-   * @return the outputs from the MockOutputCollector back to the test harness.
-   */
-  public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
-    return outputs;
-  }
-
-  public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMockContext() {
-    return context;
-  }
 }
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
new file mode 100644
index 0000000..24a7fad
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockMapreduceOutputFormat.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mrunit.internal.io.Serialization;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MockMapreduceOutputFormat<K, V> implements OutputCollectable<K, V> {
+
+  private static String ATTEMPT = "attempt_000000000000_0000_m_000000_0";
+  private static TaskAttemptID TASK_ID = TaskAttemptID.forName(ATTEMPT);
+  private static final Class<?>[] TASK_ATTEMPT_CONTEXT_CLASSES = new Class<?>[] {
+      Configuration.class, TaskAttemptID.class };
+  private static final Class<?>[] JOB_CONTEXT_CLASSES = new Class<?>[] {
+      Configuration.class, JobID.class };
+
+  private final Job job;
+  private final File outputPath = new File(
+      System.getProperty("java.io.tmpdir"), "mrunit-" + Math.random());
+  private TaskAttemptContext taskAttemptContext;
+  private RecordWriter recordWriter;
+  private final InputFormat inputFormat;
+  private final OutputFormat outputFormat;
+  private final Serialization serialization;
+  private final List<Pair<K, V>> outputs = new ArrayList<Pair<K, V>>();
+
+  public MockMapreduceOutputFormat(Job job,
+      Class<? extends OutputFormat> outputFormatClass,
+      Class<? extends InputFormat> inputFormatClass) throws IOException {
+    this.job = job;
+
+    outputFormat = ReflectionUtils.newInstance(outputFormatClass,
+        job.getConfiguration());
+    inputFormat = ReflectionUtils.newInstance(inputFormatClass,
+        job.getConfiguration());
+    serialization = new Serialization((job.getConfiguration()));
+
+    if (outputPath.exists()) {
+      throw new IllegalStateException(
+          "Generated the same random dir name twice: " + outputPath);
+    }
+    if (!outputPath.mkdir()) {
+      throw new IOException("Failed to create output dir " + outputPath);
+    }
+    FileOutputFormat.setOutputPath(job, new Path(outputPath.toString()));
+  }
+
+  private void setClassIfUnset(String name, Class<?> classType) {
+    job.getConfiguration().setIfUnset(name, classType.getName());
+  }
+
+  private Object createObject(String primaryClassName,
+      String secondaryClassName, Class<?>[] constructorParametersClasses,
+      Object... constructorParameters) {
+    try {
+      Class<?> classType = Class.forName(primaryClassName);
+      try {
+        Constructor<?> constructor = classType
+            .getConstructor(constructorParametersClasses);
+        return constructor.newInstance(constructorParameters);
+      } catch (SecurityException e) {
+        throw new IllegalStateException(e);
+      } catch (NoSuchMethodException e) {
+        throw new IllegalStateException(e);
+      } catch (IllegalArgumentException e) {
+        throw new IllegalStateException(e);
+      } catch (InstantiationException e) {
+        throw new IllegalStateException(e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException(e);
+      } catch (InvocationTargetException e) {
+        throw new IllegalStateException(e);
+      }
+    } catch (ClassNotFoundException e) {
+      if (secondaryClassName == null) {
+        throw new IllegalStateException(e);
+      }
+      return createObject(secondaryClassName, null,
+          constructorParametersClasses, constructorParameters);
+    }
+  }
+
+  @Override
+  public void collect(K key, V value) throws IOException {
+    try {
+      if (recordWriter == null) {
+        setClassIfUnset("mapred.output.key.class", key.getClass());
+        setClassIfUnset("mapred.output.value.class", value.getClass());
+
+        taskAttemptContext = (TaskAttemptContext) createObject(
+            "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl",
+            "org.apache.hadoop.mapreduce.TaskAttemptContext",
+            TASK_ATTEMPT_CONTEXT_CLASSES, job.getConfiguration(), TASK_ID);
+        recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+      }
+
+      recordWriter.write(key, value);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public List<Pair<K, V>> getOutputs() throws IOException {
+    try {
+      recordWriter.close(taskAttemptContext);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
+    FileInputFormat.setInputPaths(job, outputPath + "/*/*/*/*");
+    try {
+      List<InputSplit> inputSplits = inputFormat
+          .getSplits((JobContext) createObject(
+              "org.apache.hadoop.mapreduce.task.JobContextImpl",
+              "org.apache.hadoop.mapreduce.JobContext", JOB_CONTEXT_CLASSES,
+              job.getConfiguration(), new JobID()));
+      for (InputSplit inputSplit : inputSplits) {
+        RecordReader<K, V> recordReader = inputFormat.createRecordReader(
+            inputSplit, taskAttemptContext);
+        recordReader.initialize(inputSplit, taskAttemptContext);
+        while (recordReader.nextKeyValue()) {
+          outputs.add(new Pair<K, V>(serialization.copy(recordReader
+              .getCurrentKey()), serialization.copy(recordReader
+              .getCurrentValue())));
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    FileUtil.fullyDelete(outputPath);
+    return outputs;
+  }
+
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
index 0654f3d..46d7d26 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -43,25 +44,26 @@
  * 
  * This wrapper class exists for that purpose.
  */
-public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
-    MockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class MockReduceContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+    extends
+    AbstractMockContextWrapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context> {
 
   protected static final Log LOG = LogFactory
       .getLog(MockReduceContextWrapper.class);
   protected final List<Pair<KEYIN, List<VALUEIN>>> inputs;
   protected Pair<KEYIN, List<VALUEIN>> currentKeyValue;
-  protected final Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context;
 
   public MockReduceContextWrapper(
       final List<Pair<KEYIN, List<VALUEIN>>> inputs, final Counters counters,
-      final Configuration conf) throws IOException, InterruptedException {
-    super(counters, conf);
+      final Configuration conf,
+      OutputCollectable<KEYOUT, VALUEOUT> outputCollectable)
+      throws IOException, InterruptedException {
+    super(counters, conf, outputCollectable);
     this.inputs = inputs;
-    context = create();
   }
 
   @SuppressWarnings({ "unchecked" })
-  private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
+  protected Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create()
       throws IOException, InterruptedException {
 
     final Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context = mock(org.apache.hadoop.mapreduce.Reducer.Context.class);
@@ -97,13 +99,6 @@
     return context;
   }
 
-  /**
-   * @return the outputs from the MockOutputCollector back to the test harness.
-   */
-  public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
-    return outputs;
-  }
-
   protected static <V> Iterable<V> makeOneUseIterator(final Iterator<V> parent) {
     return new Iterable<V>() {
       private final Iterator<V> iter = new Iterator<V>() {
@@ -137,8 +132,4 @@
       }
     };
   }
-
-  public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context getMockContext() {
-    return context;
-  }
 }
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockOutputCollector.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCollector.java
similarity index 89%
rename from src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockOutputCollector.java
rename to src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCollector.java
index acc01cf..de84935 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapred/MockOutputCollector.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCollector.java
@@ -15,14 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mrunit.internal.mapred;
+package org.apache.hadoop.mrunit.internal.output;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mrunit.internal.io.Serialization;
 import org.apache.hadoop.mrunit.types.Pair;
 
@@ -31,9 +30,9 @@
  * Accepts a set of output (k, v) pairs and returns them to the framework for
  * validation.
  */
-public class MockOutputCollector<K, V> implements OutputCollector<K, V> {
+public class MockOutputCollector<K, V> implements OutputCollectable<K, V> {
 
-  private final ArrayList<Pair<K, V>> collectedOutputs;
+  private final List<Pair<K, V>> collectedOutputs;
   private final Serialization serialization;
 
   public MockOutputCollector(final Configuration conf) {
@@ -54,6 +53,7 @@
   /**
    * @return The outputs generated by the mapper/reducer being tested
    */
+  @Override
   public List<Pair<K, V>> getOutputs() {
     return collectedOutputs;
   }
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
new file mode 100644
index 0000000..8ea2497
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/MockOutputCreator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.output;
+
+import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mrunit.internal.mapred.MockMapredOutputFormat;
+import org.apache.hadoop.mrunit.internal.mapreduce.MockMapreduceOutputFormat;
+
+public class MockOutputCreator<K, V> {
+
+  private Class<? extends OutputFormat> mapredOutputFormatClass;
+  private Class<? extends InputFormat> mapredInputFormatClass;
+
+  private Class<? extends org.apache.hadoop.mapreduce.OutputFormat> mapreduceOutputFormatClass;
+  private Class<? extends org.apache.hadoop.mapreduce.InputFormat> mapreduceInputFormatClass;
+
+  public void setMapredFormats(Class<? extends OutputFormat> outputFormatClass,
+      Class<? extends InputFormat> inputFormatClass) {
+    mapredOutputFormatClass = returnNonNull(outputFormatClass);
+    mapredInputFormatClass = returnNonNull(inputFormatClass);
+  }
+
+  public void setMapreduceFormats(
+      Class<? extends org.apache.hadoop.mapreduce.OutputFormat> outputFormatClass,
+      Class<? extends org.apache.hadoop.mapreduce.InputFormat> inputFormatClass) {
+    mapreduceOutputFormatClass = returnNonNull(outputFormatClass);
+    mapreduceInputFormatClass = returnNonNull(inputFormatClass);
+  }
+
+  public OutputCollectable<K, V> createOutputCollectable(Configuration conf)
+      throws IOException {
+    if (mapredOutputFormatClass != null) {
+      return new MockMapredOutputFormat<K, V>(new JobConf(conf),
+          mapredOutputFormatClass, mapredInputFormatClass);
+    }
+    if (mapreduceOutputFormatClass != null) {
+      return new MockMapreduceOutputFormat<K, V>(new Job(conf),
+          mapreduceOutputFormatClass, mapreduceInputFormatClass);
+    }
+    return new MockOutputCollector<K, V>(conf);
+  }
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/output/OutputCollectable.java b/src/main/java/org/apache/hadoop/mrunit/internal/output/OutputCollectable.java
new file mode 100644
index 0000000..24bb597
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/output/OutputCollectable.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.internal.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mrunit.types.Pair;
+
+public interface OutputCollectable<K, V> extends OutputCollector<K, V> {
+
+  List<Pair<K, V>> getOutputs() throws IOException;
+
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
index 7c7a658..1e690ba 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
@@ -28,12 +28,14 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mrunit.MapDriverBase;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
-import org.apache.hadoop.mrunit.internal.mapreduce.MockInputSplit;
 import org.apache.hadoop.mrunit.internal.mapreduce.MockMapContextWrapper;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -52,6 +54,8 @@
   private Mapper<K1, V1, K2, V2> myMapper;
   private Counters counters;
 
+  private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
   public MapDriver(final Mapper<K1, V1, K2, V2> m) {
     this();
     setMapper(m);
@@ -199,6 +203,13 @@
     return this;
   }
 
+  public MapDriver<K1, V1, K2, V2> withOutputFormat(
+      final Class<? extends OutputFormat> outputFormatClass,
+      final Class<? extends InputFormat> inputFormatClass) {
+    mockOutputCreator.setMapreduceFormats(outputFormatClass, inputFormatClass);
+    return this;
+  }
+
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
     if (inputKey == null || inputVal == null) {
@@ -211,15 +222,13 @@
     final List<Pair<K1, V1>> inputs = new ArrayList<Pair<K1, V1>>();
     inputs.add(new Pair<K1, V1>(inputKey, inputVal));
 
-    final InputSplit inputSplit = new MockInputSplit();
-    
     try {
+      final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+          .createOutputCollectable(getConfiguration());
       final MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper<K1, V1, K2, V2>(
-          inputs, getCounters(), getConfiguration(), inputSplit);
-
-      final Mapper<K1, V1, K2, V2>.Context context = wrapper.getMockContext();
-      myMapper.run(context);
-      return wrapper.getOutputs();
+          inputs, getCounters(), getConfiguration(), outputCollectable);
+      myMapper.run(wrapper.getMockContext());
+      return outputCollectable.getOutputs();
     } catch (final InterruptedException ie) {
       throw new IOException(ie);
     }
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index ffc024f..eaa0e76 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -28,7 +28,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mrunit.MapReduceDriverBase;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
@@ -55,6 +57,9 @@
   private Reducer<K2, V2, K2, V2> myCombiner;
   private Counters counters;
 
+  private Class<? extends OutputFormat> outputFormatClass;
+  private Class<? extends InputFormat> inputFormatClass;
+
   public MapReduceDriver(final Mapper<K1, V1, K2, V2> m,
       final Reducer<K2, V2, K3, V3> r) {
     this();
@@ -262,6 +267,14 @@
     return this;
   }
 
+  public MapReduceDriver<K1, V1, K2, V2, K3, V3> withOutputFormat(
+      final Class<? extends OutputFormat> outputFormatClass,
+      final Class<? extends InputFormat> inputFormatClass) {
+    this.outputFormatClass = returnNonNull(outputFormatClass);
+    this.inputFormatClass = returnNonNull(inputFormatClass);
+    return this;
+  }
+
   /**
    * The private class to manage starting the reduce phase is used for type
    * genericity reasons. This class is used in the run() method.
@@ -281,9 +294,14 @@
         LOG.debug("Reducing input (" + inputKey.toString() + ", "
             + sb.toString() + ")");
 
-        reduceOutputs.addAll(ReduceDriver.newReduceDriver(reducer)
-            .withCounters(getCounters()).withConfiguration(configuration)
-            .withInputKey(inputKey).withInputValues(inputValues).run());
+        final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
+            .newReduceDriver(reducer).withCounters(getCounters())
+            .withConfiguration(configuration).withInputKey(inputKey)
+            .withInputValues(inputValues);
+        if (outputFormatClass != null) {
+          reduceDriver.withOutputFormat(outputFormatClass, inputFormatClass);
+        }
+        reduceOutputs.addAll(reduceDriver.run());
       }
 
       return reduceOutputs;
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index e365ee4..4649d15 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -28,10 +28,14 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mrunit.ReduceDriverBase;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
 import org.apache.hadoop.mrunit.internal.mapreduce.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
+import org.apache.hadoop.mrunit.internal.output.OutputCollectable;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -52,6 +56,8 @@
   private Reducer<K1, V1, K2, V2> myReducer;
   private Counters counters;
 
+  private final MockOutputCreator<K2, V2> mockOutputCreator = new MockOutputCreator<K2, V2>();
+
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
     this();
     setReducer(r);
@@ -211,6 +217,13 @@
     return this;
   }
 
+  public ReduceDriver<K1, V1, K2, V2> withOutputFormat(
+      final Class<? extends OutputFormat> outputFormatClass,
+      final Class<? extends InputFormat> inputFormatClass) {
+    mockOutputCreator.setMapreduceFormats(outputFormatClass, inputFormatClass);
+    return this;
+  }
+
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
     if (inputKey == null || getInputValues().isEmpty()) {
@@ -224,11 +237,12 @@
     inputs.add(new Pair<K1, List<V1>>(inputKey, getInputValues()));
 
     try {
+      final OutputCollectable<K2, V2> outputCollectable = mockOutputCreator
+          .createOutputCollectable(getConfiguration());
       final MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
-          inputs, getCounters(), getConfiguration());
-      final Reducer<K1, V1, K2, V2>.Context context = wrapper.getMockContext();
-      myReducer.run(context);
-      return wrapper.getOutputs();
+          inputs, getCounters(), getConfiguration(), outputCollectable);
+      myReducer.run(wrapper.getMockContext());
+      return outputCollectable.getOutputs();
     } catch (final InterruptedException ie) {
       throw new IOException(ie);
     }
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
index e9574bc..c061f58 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapDriver.java
@@ -33,6 +33,10 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.junit.Before;
@@ -426,4 +430,23 @@
     driver.withOutput(new Text("a"), output);
     driver.runTest();
   }
+
+  @Test
+  public void testOutputFormat() {
+    driver.withOutputFormat(SequenceFileOutputFormat.class,
+        SequenceFileInputFormat.class);
+    driver.withInput(new Text("a"), new Text("1"));
+    driver.withOutput(new Text("a"), new Text("1"));
+    driver.runTest();
+  }
+
+  @Test
+  public void testOutputFormatWithMismatchInOutputClasses() {
+    final MapDriver<Text, Text, LongWritable, Text> driver = MapDriver
+        .newMapDriver(new IdentityMapper());
+    driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+    driver.withInput(new Text("a"), new Text("1"));
+    driver.withOutput(new LongWritable(), new Text("a\t1"));
+    driver.runTest();
+  }
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
index e341b60..2540cf9 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestMapReduceDriver.java
@@ -35,6 +35,10 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
@@ -441,4 +445,24 @@
     driver.addOutput(key, value);
     driver.runTest();
   }
+
+  @Test
+  public void testOutputFormat() {
+    driver.withOutputFormat(SequenceFileOutputFormat.class,
+        SequenceFileInputFormat.class);
+    driver.withInput(new Text("a"), new LongWritable(1));
+    driver.withInput(new Text("a"), new LongWritable(2));
+    driver.withOutput(new Text("a"), new LongWritable(3));
+    driver.runTest();
+  }
+
+  @Test
+  public void testOutputFormatWithMismatchInOutputClasses() {
+    final MapReduceDriver driver = this.driver;
+    driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+    driver.withInput(new Text("a"), new LongWritable(1));
+    driver.withInput(new Text("a"), new LongWritable(2));
+    driver.withOutput(new LongWritable(), new Text("a\t3"));
+    driver.runTest();
+  }
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java
index 0abbff4..fba4770 100644
--- a/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/TestReduceDriver.java
@@ -33,6 +33,11 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.LongSumReducer;
 import org.apache.hadoop.mrunit.types.Pair;
@@ -361,6 +366,28 @@
     driver.runTest();
   }
 
+  @Test
+  public void testOutputFormat() {
+    driver.withOutputFormat(SequenceFileOutputFormat.class,
+        SequenceFileInputFormat.class);
+    driver.withInputKey(new Text("a"));
+    driver.withInputValue(new LongWritable(1)).withInputValue(
+        new LongWritable(2));
+    driver.withOutput(new Text("a"), new LongWritable(3));
+    driver.runTest();
+  }
+
+  @Test
+  public void testOutputFormatWithMismatchInOutputClasses() {
+    final ReduceDriver driver = ReduceDriver.newReduceDriver(reducer);
+    driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+    driver.withInputKey(new Text("a"));
+    driver.withInputValue(new LongWritable(1)).withInputValue(
+        new LongWritable(2));
+    driver.withOutput(new LongWritable(), new Text("a\t3"));
+    driver.runTest();
+  }
+
   /**
    * Simple reducer that have custom counters that are increased each map() call
    */
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
index 7eaff18..10213b2 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapDriver.java
@@ -31,7 +31,11 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mrunit.ExpectedSuppliedException;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.junit.Before;
@@ -299,4 +303,23 @@
     driver.addOutput(new IntWritable(2), 1);
     driver.runTest();
   }
+
+  @Test
+  public void testOutputFormat() {
+    driver.withOutputFormat(SequenceFileOutputFormat.class,
+        SequenceFileInputFormat.class);
+    driver.withInput(new Text("a"), new Text("1"));
+    driver.withOutput(new Text("a"), new Text("1"));
+    driver.runTest();
+  }
+
+  @Test
+  public void testOutputFormatWithMismatchInOutputClasses() {
+    final MapDriver<Text, Text, LongWritable, Text> driver = MapDriver
+        .newMapDriver(new Mapper());
+    driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+    driver.withInput(new Text("a"), new Text("1"));
+    driver.withOutput(new LongWritable(), new Text("a\t1"));
+    driver.runTest();
+  }
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
index 05ead70..8d1be2f 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
@@ -32,7 +32,11 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.mrunit.ExpectedSuppliedException;
@@ -409,4 +413,24 @@
     driver.withOutput(2, new IntWritable(1)).withOutput(3, new IntWritable(2))
         .runTest();
   }
+
+  @Test
+  public void testOutputFormat() {
+    driver.withOutputFormat(SequenceFileOutputFormat.class,
+        SequenceFileInputFormat.class);
+    driver.withInput(new Text("a"), new LongWritable(1));
+    driver.withInput(new Text("a"), new LongWritable(2));
+    driver.withOutput(new Text("a"), new LongWritable(3));
+    driver.runTest();
+  }
+
+  @Test
+  public void testOutputFormatWithMismatchInOutputClasses() {
+    final MapReduceDriver driver = this.driver;
+    driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+    driver.withInput(new Text("a"), new LongWritable(1));
+    driver.withInput(new Text("a"), new LongWritable(2));
+    driver.withOutput(new LongWritable(), new Text("a\t3"));
+    driver.runTest();
+  }
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
index 38e506e..cbb0e78 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
@@ -32,7 +32,12 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.mrunit.ExpectedSuppliedException;
@@ -371,4 +376,26 @@
     driver.withInputKey(1).withInputValue(new IntWritable(2))
         .withOutput(1, new IntWritable(2)).runTest();
   }
+  
+  @Test
+  public void testOutputFormat() {
+    driver.withOutputFormat(SequenceFileOutputFormat.class,
+        SequenceFileInputFormat.class);
+    driver.withInputKey(new Text("a"));
+    driver.withInputValue(new LongWritable(1)).withInputValue(
+        new LongWritable(2));
+    driver.withOutput(new Text("a"), new LongWritable(3));
+    driver.runTest();
+  }
+
+  @Test
+  public void testOutputFormatWithMismatchInOutputClasses() {
+    final ReduceDriver driver = ReduceDriver.newReduceDriver(reducer);
+    driver.withOutputFormat(TextOutputFormat.class, TextInputFormat.class);
+    driver.withInputKey(new Text("a"));
+    driver.withInputValue(new LongWritable(1)).withInputValue(
+        new LongWritable(2));
+    driver.withOutput(new LongWritable(), new Text("a\t3"));
+    driver.runTest();
+  }
 }