TEZ-490. Rename SimpleInput / SimpleOutput to be MR specific (part of
TEZ-398). (sseth)
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index f32fa6b..1967462 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -85,8 +85,8 @@
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -497,20 +497,20 @@
 
     // FIXME need Input/Output vertices else we have this hack
     if (taskSpec.getInputs().isEmpty()) {
-      InputDescriptor simpleInputDesc =
-          new InputDescriptor(SimpleInputLegacy.class.getName());
-      simpleInputDesc.setUserPayload(
+      InputDescriptor mrInputDesc =
+          new InputDescriptor(MRInputLegacy.class.getName());
+      mrInputDesc.setUserPayload(
           taskSpec.getProcessorDescriptor().getUserPayload());
       taskSpec.getInputs().add(
-          new InputSpec("null", simpleInputDesc, 0));
+          new InputSpec("null", mrInputDesc, 0));
     }
     if (taskSpec.getOutputs().isEmpty()) {
-      OutputDescriptor simpleOutputDesc =
-          new OutputDescriptor(SimpleOutput.class.getName());
-      simpleOutputDesc.setUserPayload(
+      OutputDescriptor mrOutputDesc =
+          new OutputDescriptor(MROutput.class.getName());
+      mrOutputDesc.setUserPayload(
           taskSpec.getProcessorDescriptor().getUserPayload());
       taskSpec.getOutputs().add(
-          new OutputSpec("null", simpleOutputDesc, 0));
+          new OutputSpec("null", mrOutputDesc, 0));
     }
     String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
     conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
index 0b86a8e..2c53e75 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -187,7 +187,7 @@
 
   
   
-  // TODO NEWTEZ Move this into a common class. Also used in SImpleInput
+  // TODO NEWTEZ Move this into a common class. Also used in MRInput
   private class SimpleValueIterator implements Iterator<V> {
 
     private V value;
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index f59e836..f2b0a38 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -83,8 +83,8 @@
 //import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
 //import org.apache.tez.mapreduce.hadoop.IDConverter;
 //import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-//import org.apache.tez.mapreduce.input.SimpleInput;
-//import org.apache.tez.mapreduce.output.SimpleOutput;
+//import org.apache.tez.mapreduce.input.MRInput;
+//import org.apache.tez.mapreduce.output.MROutput;
 //import org.apache.tez.mapreduce.processor.map.MapProcessor;
 //import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 //
@@ -252,7 +252,7 @@
 //                  tezMapId, user, localConf.getJobName(), "TODO_vertexName",
 //                  mapProcessorDesc,
 //                  Collections.singletonList(new InputSpec("srcVertex", 0,
-//                      SimpleInput.class.getName())),
+//                      MRInput.class.getName())),
 //                  Collections.singletonList(new OutputSpec("tgtVertex", 0,
 //                      LocalOnFileSorterOutput.class.getName())));
 //
@@ -458,7 +458,7 @@
 //                Collections.singletonList(new InputSpec("TODO_srcVertexName",
 //                    mapIds.size(), LocalMergedInput.class.getName())),
 //                Collections.singletonList(new OutputSpec("TODO_targetVertex",
-//                    0, SimpleOutput.class.getName())));
+//                    0, MROutput.class.getName())));
 //
 //            // move map output to reduce input
 //            for (int i = 0; i < mapIds.size(); i++) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
similarity index 97%
rename from tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
rename to tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 598f801..6066d93 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -57,16 +57,16 @@
 import com.google.common.base.Preconditions;
 
 /**
- * {@link SimpleInput} is an {@link Input} which provides key/values pairs
+ * {@link MRInput} is an {@link Input} which provides key/values pairs
  * for the consumer.
  *
  * It is compatible with all standard Apache Hadoop MapReduce 
  * {@link InputFormat} implementations.
  */
 
-public class SimpleInput implements LogicalInput {
+public class MRInput implements LogicalInput {
 
-  private static final Log LOG = LogFactory.getLog(SimpleInput.class);
+  private static final Log LOG = LogFactory.getLog(MRInput.class);
   
   
   private TezInputContext inputContext;
@@ -108,7 +108,7 @@
     this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
         thisTaskMetaInfo.getStartOffset());
     
-    // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
+    // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
     // theory, can be used by the MapProcessor, ReduceProcessor or a custom
     // processor. (The processor could provide the counter though)
     this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
@@ -201,11 +201,11 @@
   }
 
   /**
-   * SimpleInputs sets some additional parameters like split location when using
+   * {@link MRInput} sets some additional parameters like split location when using
    * the new API. This methods returns the list of additional updates, and
-   * should be used by Processors using the old MapReduce API with SimpleInput.
+   * should be used by Processors using the old MapReduce API with {@link MRInput}.
    * 
-   * @return the additional fields set by SimpleInput
+   * @return the additional fields set by {@link MRInput}
    */
   public Configuration getConfigUpdates() {
     return new Configuration(incrementalConf);
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
similarity index 95%
rename from tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
rename to tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 4e61aa7..5923746 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -21,7 +21,7 @@
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.mapred.RecordReader;
 
-public class SimpleInputLegacy extends SimpleInput {
+public class MRInputLegacy extends MRInput {
 
   @Private
   public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
similarity index 97%
rename from tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
rename to tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index d82c9e2..e6bdbe6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -37,9 +37,9 @@
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
-public class SimpleOutput implements LogicalOutput {
+public class MROutput implements LogicalOutput {
 
-  private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
+  private static final Log LOG = LogFactory.getLog(MROutput.class);
 
   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
   static {
@@ -295,7 +295,7 @@
   }
 
   /**
-   * SimpleOutput expects that a Processor call commit prior to the
+   * MROutput expects that a Processor call commit prior to the
    * Processor's completion
    * @throws IOException
    */
@@ -310,7 +310,7 @@
 
 
   /**
-   * SimpleOutput expects that a Processor call abort in case of any error
+   * MROutput expects that a Processor call abort in case of any error
    * ( including an error during commit ) prior to the Processor's completion
    * @throws IOException
    */
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index f7404d4..fac1454 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -81,7 +81,7 @@
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
 
 @SuppressWarnings("deprecation")
 public abstract class MRTask {
@@ -423,8 +423,8 @@
         + " And is in the process of committing");
     // TODO change this to use the new context
     // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
-    if (output instanceof SimpleOutput) {
-      SimpleOutput sOut = (SimpleOutput)output;
+    if (output instanceof MROutput) {
+      MROutput sOut = (MROutput)output;
       if (sOut.isCommitRequired()) {
         //wait for commit approval and commit
         // TODO EVENTUALLY - Commit is not required for map tasks.
@@ -458,7 +458,7 @@
     statusUpdate();
   }
 
-  private void commit(SimpleOutput output) throws IOException {
+  private void commit(MROutput output) throws IOException {
     int retries = 3;
     while (true) {
       // This will loop till the AM asks for the task to be killed. As
@@ -495,7 +495,7 @@
   }
 
   private
-  void discardOutput(SimpleOutput output) {
+  void discardOutput(MROutput output) {
     try {
       output.abort();
     } catch (IOException ioe)  {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 22312f7..85139ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -65,7 +65,7 @@
     if (isProcessorContext) {
       ((TezProcessorContext)context).setProgress(progress);
     } else {
-      // TODO FIXME NEWTEZ - will simpleoutput's reporter use this api?
+      // TODO FIXME NEWTEZ - will MROutput's reporter use this api?
     }
   }
 
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 2084146..e4b990a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -44,9 +44,9 @@
 import org.apache.tez.engine.api.TezProcessorContext;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
-import org.apache.tez.mapreduce.input.SimpleInput;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
@@ -99,15 +99,15 @@
     LogicalOutput out = outputs.values().iterator().next();
 
     // Sanity check
-    if (!(in instanceof SimpleInputLegacy)) {
+    if (!(in instanceof MRInputLegacy)) {
       throw new IOException(new TezException(
           "Only Simple Input supported. Input: " + in.getClass()));
     }
-    SimpleInputLegacy input = (SimpleInputLegacy)in;
+    MRInputLegacy input = (MRInputLegacy)in;
 
     KVWriter kvWriter = null;
     if (!(out instanceof OnFileSortedOutput)) {
-      kvWriter = ((SimpleOutput)out).getWriter();
+      kvWriter = ((MROutput)out).getWriter();
     } else {
       kvWriter = ((OnFileSortedOutput)out).getWriter();
     }
@@ -124,13 +124,13 @@
   void runOldMapper(
       final JobConf job,
       final MRTaskReporter reporter,
-      final SimpleInputLegacy input,
+      final MRInputLegacy input,
       final KVWriter output
       ) throws IOException, InterruptedException {
 
     // Initialize input in-line since it sets parameters which may be used by the processor.
-    // Done only for SimpleInput.
-    // TODO use new method in SimpleInput to get required info
+    // Done only for MRInput.
+    // TODO use new method in MRInput to get required info
     //input.initialize(job, master);
 
     RecordReader in = new OldRecordReader(input);
@@ -147,13 +147,13 @@
 
   private void runNewMapper(final JobConf job,
       MRTaskReporter reporter,
-      final SimpleInputLegacy in,
+      final MRInputLegacy in,
       KVWriter out
       ) throws IOException, InterruptedException {
 
     // Initialize input in-line since it sets parameters which may be used by the processor.
-    // Done only for SimpleInput.
-    // TODO use new method in SimpleInput to get required info
+    // Done only for MRInput.
+    // TODO use new method in MRInput to get required info
     //in.initialize(job, master);
 
     // make a task context so we can get the classes
@@ -197,10 +197,10 @@
 
   private static class NewRecordReader extends
       org.apache.hadoop.mapreduce.RecordReader {
-    private final SimpleInput in;
+    private final MRInput in;
     private KVReader reader;
 
-    private NewRecordReader(SimpleInput in) throws IOException {
+    private NewRecordReader(MRInput in) throws IOException {
       this.in = in;
       this.reader = in.getReader();
     }
@@ -241,38 +241,38 @@
   }
 
   private static class OldRecordReader implements RecordReader {
-    private final SimpleInputLegacy simpleInput;
+    private final MRInputLegacy mrInput;
 
-    private OldRecordReader(SimpleInputLegacy simpleInput) {
-      this.simpleInput = simpleInput;
+    private OldRecordReader(MRInputLegacy mrInput) {
+      this.mrInput = mrInput;
     }
 
     @Override
     public boolean next(Object key, Object value) throws IOException {
       // TODO broken
-//      simpleInput.setKey(key);
-//      simpleInput.setValue(value);
+//      mrInput.setKey(key);
+//      mrInput.setValue(value);
 //      try {
-//        return simpleInput.hasNext();
+//        return mrInput.hasNext();
 //      } catch (InterruptedException ie) {
 //        throw new IOException(ie);
 //      }
-      return simpleInput.getOldRecordReader().next(key, value);
+      return mrInput.getOldRecordReader().next(key, value);
     }
 
     @Override
     public Object createKey() {
-      return simpleInput.getOldRecordReader().createKey();
+      return mrInput.getOldRecordReader().createKey();
     }
 
     @Override
     public Object createValue() {
-      return simpleInput.getOldRecordReader().createValue();
+      return mrInput.getOldRecordReader().createValue();
     }
 
     @Override
     public long getPos() throws IOException {
-      return simpleInput.getOldRecordReader().getPos();
+      return mrInput.getOldRecordReader().getPos();
     }
 
     @Override
@@ -282,7 +282,7 @@
     @Override
     public float getProgress() throws IOException {
       try {
-        return simpleInput.getProgress();
+        return mrInput.getProgress();
       } catch (InterruptedException ie) {
         throw new IOException(ie);
       }
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 9274765..19acb39 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
 import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
@@ -133,8 +133,8 @@
     KVReader kvReader = shuffleInput.getReader();
 
     KVWriter kvWriter = null;
-    if((out instanceof SimpleOutput)) {
-      kvWriter = ((SimpleOutput) out).getWriter();
+    if((out instanceof MROutput)) {
+      kvWriter = ((MROutput) out).getWriter();
     } else if ((out instanceof OnFileSortedOutput)) {
       kvWriter = ((OnFileSortedOutput) out).getWriter();
     } else {
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 06e2f4b..89292ab 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -48,7 +48,7 @@
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.junit.After;
@@ -120,7 +120,7 @@
     
     MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
@@ -191,7 +191,7 @@
 //            localFs, workDir, job, 0, new Path(workDir, "map0"), 
 //            new TestUmbilicalProtocol(true), vertexName, 
 //            Collections.singletonList(new InputSpec("NullVertex", 0,
-//                SimpleInput.class.getName())),
+//                MRInput.class.getName())),
 //            Collections.singletonList(new OutputSpec("FakeVertex", 1,
 //                OldInMemorySortedOutput.class.getName()))
 //            );
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index a3abd76..274c353 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -54,8 +54,8 @@
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.input.SimpleInputLegacy;
-import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.MapUtils;
 import org.junit.After;
@@ -125,7 +125,7 @@
     Path mapInput = new Path(workDir, "map0");
     MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(SimpleInputLegacy.class.getName()), 0);
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
@@ -152,7 +152,7 @@
         ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
     
     InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
-    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(SimpleOutput.class.getName()), 1);
+    OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutput.class.getName()), 1);
     
     // Now run a reduce
     TaskSpec taskSpec = new TaskSpec(