MRUNIT-129: Key object re-use in Reducer is inconsistent with MapReduce behaviour (new API)
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index eea0c60..04209c2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -923,4 +923,9 @@
       final StringBuilder sb) {
     StringUtils.formatValueList(values, sb);
   }
+
+  protected static <KEYIN, VALUEIN> void formatPairList(final List<Pair<KEYIN,VALUEIN>> pairs,
+      final StringBuilder sb) {
+    StringUtils.formatPairList(pairs, sb);
+  }
 }
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
index 0bdfd6a..d753cff 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
@@ -29,10 +29,9 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -51,14 +50,24 @@
 
   protected static final Log LOG = LogFactory
       .getLog(MockReduceContextWrapper.class);
-  protected final List<Pair<KEYIN, List<VALUEIN>>> inputs;
+
+  protected final List<KeyValueReuseList<KEYIN, VALUEIN>> inputs;
+
   protected final ReduceDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver;
-  
-  protected Pair<KEYIN, List<VALUEIN>> currentKeyValue;
-  
+
+  protected KeyValueReuseList<KEYIN, VALUEIN> currentKeyValue;
+
+  /**
+   * New constructor with new input format.
+   * @param configuration
+   * @param inputs
+   * @param mockOutputCreator
+   * @param driver
+   * @param unused this parameter is here only to avoid erasure collision with deprecated constructor.
+   */
   public MockReduceContextWrapper(
       final Configuration configuration,
-      final List<Pair<KEYIN, List<VALUEIN>>> inputs, 
+      final List<KeyValueReuseList<KEYIN, VALUEIN>> inputs,
       final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator,
       final ReduceDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver) {
     super(configuration, mockOutputCreator);
@@ -72,7 +81,7 @@
   protected Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create() {
 
     final Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context = mock(org.apache.hadoop.mapreduce.Reducer.Context.class);
-    
+
     createCommon(context, driver, mockOutputCreator);
     try {
       /*
@@ -94,13 +103,13 @@
       when(context.getCurrentKey()).thenAnswer(new Answer<KEYIN>() {
         @Override
         public KEYIN answer(final InvocationOnMock invocation) {
-          return currentKeyValue.getFirst();
+          return currentKeyValue.getCurrentKey();
         }
       });
       when(context.getValues()).thenAnswer(new Answer<Iterable<VALUEIN>>() {
         @Override
         public Iterable<VALUEIN> answer(final InvocationOnMock invocation) {
-          return makeOneUseIterator(currentKeyValue.getSecond().iterator());
+          return makeOneUseIterator(currentKeyValue.valueIterator());
         }
       });
       when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>(){
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java b/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java
index ceef2f4..ed76f28 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java
@@ -88,4 +88,22 @@
     sb.append(")");
   }
 
+  /**
+   * Transform a list with elements (a1,a2) and (b1,b2) into a string "[(a1,a2),(b1,b2)]".
+   */
+  public static <K, V> void formatPairList(final List<Pair<K, V>> pairs,
+      final StringBuilder sb) {
+    sb.append("[");
+
+    boolean first = true;
+    for (final Pair<K, V> p : pairs) {
+      if (!first) {
+        sb.append(", ");
+      }
+      first = false;
+      sb.append("(" + p.getFirst() + ", " + p.getSecond() + ")");
+    }
+
+    sb.append("]");
+  }
 }
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index 78941c2..7ead0aa 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -21,10 +21,12 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -32,6 +34,7 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mrunit.MapReduceDriverBase;
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -207,7 +210,7 @@
    */
   private class ReducePhaseRunner<OUTKEY, OUTVAL> {
     private List<Pair<OUTKEY, OUTVAL>> runReduce(
-        final List<Pair<K2, List<V2>>> inputs,
+        final List<KeyValueReuseList<K2, V2>> inputs,
         final Reducer<K2, V2, OUTKEY, OUTVAL> reducer) throws IOException {
 
       final List<Pair<OUTKEY, OUTVAL>> reduceOutputs = new ArrayList<Pair<OUTKEY, OUTVAL>>();
@@ -215,16 +218,16 @@
       if (!inputs.isEmpty()) {
         if (LOG.isDebugEnabled()) {
           final StringBuilder sb = new StringBuilder();
-          for (Pair<K2, List<V2>> input : inputs) {
-            formatValueList(input.getSecond(), sb);
-            LOG.debug("Reducing input (" + input.getFirst() + ", " + sb + ")");
+          for (List<Pair<K2, V2>> input : inputs) {
+            formatPairList(input, sb);
+            LOG.debug("Reducing input " + sb);
             sb.delete(0, sb.length());
           }
         }
 
         final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
             .newReduceDriver(reducer).withCounters(getCounters())
-            .withConfiguration(getConfiguration()).withAll(inputs);
+            .withConfiguration(getConfiguration()).withAllElements(inputs);
 
         if (getOutputSerializationConfiguration() != null) {
           reduceDriver
@@ -242,6 +245,25 @@
     }
   }
 
+  protected List<KeyValueReuseList<K2,V2>> sortAndGroup(final List<Pair<K2, V2>> mapOutputs){
+    if(mapOutputs.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    if (keyValueOrderComparator == null || keyGroupComparator == null){
+      JobConf conf = new JobConf(getConfiguration());
+      conf.setMapOutputKeyClass(mapOutputs.get(0).getFirst().getClass());
+      if (keyGroupComparator == null){
+        keyGroupComparator = conf.getOutputValueGroupingComparator();
+      }
+      if (keyValueOrderComparator == null) {
+        keyValueOrderComparator = conf.getOutputKeyComparator();
+      }
+    }
+    ReduceFeeder<K2,V2> reduceFeeder = new ReduceFeeder<K2,V2>(getConfiguration());
+    return reduceFeeder.sortAndGroup(mapOutputs, keyValueOrderComparator, keyGroupComparator);
+  }
+
   @Override
   public List<Pair<K3, V3>> run() throws IOException {
     try {
@@ -258,11 +280,11 @@
         // with the result of the combiner.
         LOG.debug("Starting combine phase with combiner: " + myCombiner);
         mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
-            shuffle(mapOutputs), myCombiner);
+            sortAndGroup(mapOutputs), myCombiner);
       }
       // Run the reduce phase.
       LOG.debug("Starting reduce phase with reducer: " + myReducer);
-      return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs),
+      return new ReducePhaseRunner<K3, V3>().runReduce(sortAndGroup(mapOutputs),
           myReducer);
     } finally {
       cleanupDistributedCache();
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index c7a15b7..aeb460e 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -21,6 +21,7 @@
 import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -33,6 +34,7 @@
 import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
 import org.apache.hadoop.mrunit.internal.mapreduce.ContextDriver;
 import org.apache.hadoop.mrunit.internal.mapreduce.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -47,9 +49,11 @@
     ReduceDriverBase<K1, V1, K2, V2, ReduceDriver<K1, V1, K2, V2>> implements
     ContextDriver {
 
+  protected List<KeyValueReuseList<K1, V1>> groupedInputs = new ArrayList<KeyValueReuseList<K1, V1>>();
+
   public static final Log LOG = LogFactory.getLog(ReduceDriver.class);
 
-  private Reducer<K1, V1, K2, V2> myReducer;
+  protected Reducer<K1, V1, K2, V2> myReducer;
   private Counters counters;
   /**
    * Context creator, do not use directly, always use the 
@@ -58,6 +62,82 @@
    */
   private MockReduceContextWrapper<K1, V1, K2, V2> wrapper;
 
+  public List<Pair<K1,V1>> getInputs(final K1 firstKey) {
+	for (KeyValueReuseList<K1, V1> p : groupedInputs) {
+      if(p.getCurrentKey().equals(firstKey)){
+        return p;
+      }
+	}
+    return null;
+  }
+
+  /**
+   * Clears the input to be sent to the Reducer
+   */
+  @Override
+  public void clearInput() {
+    super.clearInput();
+    groupedInputs.clear();
+  }
+
+  /**
+   * Add input (K*, V*) to send to the Reducer
+   *
+   * @param key
+   *          The key too add
+   * @param values
+   *          The value to add
+   */
+  public void addInput(final KeyValueReuseList<K1, V1> input) {
+    groupedInputs.add(input.clone(getConfiguration()));
+  }
+
+  /**
+   * Identical to addInput() but returns self for fluent programming style
+   *
+   * @param input
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withInput(final KeyValueReuseList<K1, V1> input) {
+    addInput(input);
+    return this;
+  }
+
+  /**
+   * Identical to addAllElements() but returns self for fluent programming style
+   *
+   * @param inputs
+   * @return this
+   */
+  public ReduceDriver<K1, V1, K2, V2> withAllElements(
+    // This method is called withAllElements to avoid erasure conflict with withAll method from ReduceDriverBase.
+      final List<KeyValueReuseList<K1, V1>> inputs) {
+    addAllElements(inputs);
+    return this;
+  }
+
+  /**
+   * Adds input to send to the Reducer
+   *
+   * @param inputs
+   *          list of (K*, V*) pairs
+   */
+  public void addAllElements(final List<KeyValueReuseList<K1, V1>> inputs) {
+    // This method is called addAllElements to avoid erasure conflict with addAll method from ReduceDriverBase.
+	for (KeyValueReuseList<K1, V1> input : inputs) {
+		addInput(input);
+	}
+  }
+
+  @Override
+  protected void printPreTestDebugLog() {
+    final StringBuilder sb = new StringBuilder();
+    for (List<Pair<K1, V1>> input : groupedInputs) {
+      formatPairList(input, sb);
+      LOG.debug("Reducing input " + sb);
+      sb.delete(0, sb.length());
+    }
+  }
 
   public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
     this();
@@ -134,6 +214,30 @@
     return this;
   }
 
+  /**
+   * Handle inputKey and inputValues and inputs for backwards compatibility.
+   */
+  @Override
+  protected void preRunChecks(Object reducer) {
+    if (inputKey != null && !getInputValues().isEmpty()) {
+      clearInput();
+      addInput(new ReduceFeeder<K1, V1>(getConfiguration()).updateInput(inputKey, getInputValues()));
+    }
+
+    if (inputs != null && !inputs.isEmpty()){
+      groupedInputs.clear();
+      groupedInputs = new ReduceFeeder<K1, V1>(getConfiguration()).updateAll(inputs);
+    }
+
+    if (groupedInputs == null || groupedInputs.isEmpty()) {
+      throw new IllegalStateException("No input was provided");
+    }
+
+    if (reducer == null) {
+      throw new IllegalStateException("No Reducer class was provided");
+    }
+  }
+
   @Override
   public List<Pair<K2, V2>> run() throws IOException {
     try {
@@ -157,11 +261,11 @@
   private MockReduceContextWrapper<K1, V1, K2, V2> getContextWrapper() {
     if(wrapper == null) {
       wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
-          getConfiguration(), inputs, mockOutputCreator, this);
+          getConfiguration(), groupedInputs, mockOutputCreator, this);
     }
     return wrapper;
   }
-  
+
   /**
    * <p>Obtain Context object for furthering mocking with Mockito.
    * For example, causing write() to throw an exception:</p>
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceFeeder.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceFeeder.java
new file mode 100644
index 0000000..c55c624
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceFeeder.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mrunit.internal.io.Serialization;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * This Class provides some methods to get inputs compatible with new API reducer.
+ *
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ */
+public class ReduceFeeder<K, V> {
+  private final Serialization serialization;
+  private final Configuration conf;
+
+  /**
+   * Constructor
+   *
+   * @param conf The driver's configuration.
+   */
+  public ReduceFeeder(Configuration conf){
+    this.conf = conf;
+    serialization = new Serialization(conf);
+  }
+
+  /**
+   * This method takes the outputs from a mapper and return the
+   * corresponding reducer input where keys have been sorted and
+   * grouped according to given comparators.
+   *
+   * If at least one comparator is null, Keys have to implements Comparable<K>.
+   *
+   * @param mapOutputs The outputs from mapper
+   * @param keyValueOrderComparator The comparator for ordering keys.
+   * @param keyGroupComparator The comparator for grouping keys
+   * @return key values sorted and grouped for the reducer
+   */
+  public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs,
+      final Comparator<K> keyValueOrderComparator,
+      final Comparator<K> keyGroupComparator){
+    if(mapOutputs.isEmpty()) {
+      return Collections.emptyList();
+    }
+    if (keyValueOrderComparator != null){
+      Collections.sort(mapOutputs, new Comparator<Pair<K,V>>(){
+        @Override
+        public int compare(Pair<K, V> o1, Pair<K, V> o2) {
+          return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
+        }
+      });
+    } else {
+      Collections.sort(mapOutputs);
+    }
+
+    List<KeyValueReuseList<K, V>> groupedInputs = new ArrayList<KeyValueReuseList<K, V>>();
+
+    K currentKey = null;
+    KeyValueReuseList<K, V> currentEntries = null;
+    for(Pair<K,V> p : mapOutputs){
+      if (currentKey == null
+                || (keyGroupComparator != null && keyGroupComparator.compare(currentKey, p.getFirst()) != 0)
+                || (keyGroupComparator == null && ((Comparable<K>)currentKey).compareTo(p.getFirst()) != 0)) {
+        currentKey = p.getFirst();
+        currentEntries = new KeyValueReuseList<K, V>(serialization.copy(p.getFirst()), serialization.copy(p.getSecond()), conf);
+        groupedInputs.add(currentEntries);
+      }
+      currentEntries.add(p);
+    }
+    return groupedInputs;
+  }
+
+  /**
+   * This method takes the outputs from a mapper and return the
+   * corresponding reducer input where keys have been sorted and
+   * grouped according to given comparator.
+   *
+   * If the comparator is null, Keys have to implements Comparable<K>.
+   *
+   * @param mapOutputs The outputs from mapper
+   * @param keyOrderAndGroupComparator The comparator for grouping and ordering keys
+   * @return key values sorted and grouped for the reducer
+   */
+  public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs,
+	      final Comparator<K> keyOrderAndGroupComparator){
+	  return sortAndGroup(mapOutputs, keyOrderAndGroupComparator, keyOrderAndGroupComparator);
+  }
+
+  /**
+   * This method takes the outputs from a mapper and return the
+   * corresponding reducer input where keys have been sorted and
+   * grouped according to their natural order.
+   *
+   * Keys have to implements Comparable<K>.
+   *
+   * @param mapOutputs The outputs from mapper
+   * @return key values sorted and grouped for the reducer
+   */
+  public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs){
+	  return sortAndGroup(mapOutputs, null, null);
+  }
+
+  /**
+   * This method takes a list of (k, v*) and return a list of (k*, v*) where k have been duplicated.
+   *
+   * @param inputs The list of key values pair with one key and multiples values
+   * @return Reducer inputs compatible with the new API
+   */
+  public List<KeyValueReuseList<K, V>> updateAll(final List<Pair<K, List<V>>> inputs) {
+    List<KeyValueReuseList<K, V>> transformedInputs = new ArrayList<KeyValueReuseList<K, V>>();
+
+    for(Pair<K, List<V>> keyValues : inputs){
+      if (!keyValues.getSecond().isEmpty()){
+        transformedInputs.add(updateInput(keyValues));
+      }
+    }
+    return transformedInputs;
+  }
+
+  /**
+   * This method takes a (k, v*) input and return a (k*, v*) where k have been duplicated.
+   *
+   * @param input The key values pair with one key for multiple values
+   * @return Reducer input compatible with the new API
+   */
+  public KeyValueReuseList<K, V> updateInput(final Pair<K, List<V>> input){
+	  return updateInput(input.getFirst(), input.getSecond());
+  }
+
+  /**
+   * This method takes a (k, v*) input and return a (k*, v*) where k have been duplicated.
+   *
+   * @param input The key values pair with one key for multiple values
+   * @return Reducer input compatible with the new API
+   */
+  public KeyValueReuseList<K, V> updateInput(final K key, List<V> values){
+    if (values.isEmpty()){
+      return new KeyValueReuseList<K, V>(serialization.copy(key), null, conf);
+    }
+    KeyValueReuseList<K, V> entry =
+      new KeyValueReuseList<K, V>(serialization.copy(key),serialization.copy(values.get(0)), conf);
+    for(V value : values){
+      entry.add(new Pair<K, V>(serialization.copy(key), serialization.copy(value)));
+    }
+    return entry;
+  }
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/types/KeyValueReuseList.java b/src/main/java/org/apache/hadoop/mrunit/types/KeyValueReuseList.java
new file mode 100644
index 0000000..5aa56ad
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/types/KeyValueReuseList.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.types;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mrunit.internal.io.Serialization;
+
+/**
+ * A List of Pair<K, V> to store a reducer input (k, v)* with keys equivalent for a grouping comparator.
+ *
+ * @param <K> The type of keys
+ * @param <V> The type of Values
+ */
+public class KeyValueReuseList<K, V> extends ArrayList<Pair<K,V>>{
+  private static final long serialVersionUID = 1192028416166671770L;
+  private final K keyContainer;
+  private final V valueContainer;
+  private final Serialization serialization;
+
+  public KeyValueReuseList(K keyContainer, V valueContainer, Configuration conf){
+    super();
+    serialization = new Serialization(conf);
+    this.keyContainer = keyContainer;
+    this.valueContainer = valueContainer;
+  }
+
+  public K getCurrentKey() {
+    return keyContainer;
+  }
+
+  public KeyValueReuseList<K, V> clone(Configuration conf){
+    K key = serialization.copy(keyContainer);
+    V value = serialization.copy(valueContainer);
+    KeyValueReuseList<K, V> clone = new KeyValueReuseList<K, V>(key, value, conf);
+    for (Pair<K, V> pair : this) {
+      clone.add(new Pair<K, V>(serialization.copy(pair.getFirst()), serialization.copy(pair.getSecond())));
+    }
+    return clone;
+  }
+
+  public Iterator<V> valueIterator() {
+    return new Iterator<V>() {
+      private K key = keyContainer;
+      private V value = valueContainer;
+      private final Iterator<Pair<K, V>> delegate = iterator();
+
+      @Override
+      public boolean hasNext() {
+        return delegate.hasNext();
+      }
+
+      @Override
+      public V next() {
+        Pair<K, V> p = delegate.next();
+        key = serialization.copy(p.getFirst(), key);
+        value = serialization.copy(p.getSecond(), value);
+        return value;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}
diff --git a/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java b/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java
index 27f1d67..070a743 100644
--- a/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java
+++ b/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java
@@ -224,4 +224,35 @@
     StringUtils.formatValueList(Arrays.asList("first", "second", "third"), stringBuilder);
     assertEquals("previous message (first, second, third)", stringBuilder.toString());
   }
+
+  /**
+   * Test method for {@link StringUtils#formatPairList(List, StringBuilder)}
+   */
+  @Test
+  public void shouldFormatPairListWhenEmpty() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("previous message ");
+    List<Pair<String, String>> list = new ArrayList<Pair<String, String>>();
+    StringUtils.formatPairList(list, stringBuilder);
+    assertEquals("previous message []", stringBuilder.toString());
+  }
+
+  @Test
+  public void shouldFormatPairListWithSingleElement() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("previous message ");
+    StringUtils.formatPairList(Arrays.asList(new Pair<String, String>("key", "value")), stringBuilder);
+    assertEquals("previous message [(key, value)]", stringBuilder.toString());
+  }
+
+  @Test
+  public void shouldFormatPairListWithManyElement() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("previous message ");
+    StringUtils.formatPairList(Arrays.asList(
+            new Pair<String, String>("first_key", "first_value"),
+            new Pair<String, String>("second_key", "second_value"),
+            new Pair<String, String>("third_key", "third_value")), stringBuilder);
+    assertEquals("previous message [(first_key, first_value), (second_key, second_value), (third_key, third_value)]", stringBuilder.toString());
+  }
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
index 5ce285d..5d89c38 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
@@ -45,6 +45,7 @@
 import org.apache.hadoop.mrunit.TestMapReduceDriver.SecondCharComparator;
 import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
 import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.hadoop.mrunit.types.TestWritable;
 import org.junit.Before;
@@ -248,6 +249,97 @@
   }
 
   @Test
+  public void testEmptySortAndGroup() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+    assertEquals(0, outputs.size());
+  }
+
+  // just sort and group a single (k, v) pair
+  @Test
+  public void testSingleSortAndGroup() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+    sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+    expected.add(sublist);
+
+    assertListEquals(expected, outputs);
+  }
+
+  // sort and group multiple values from the same key.
+  @Test
+  public void testSortAndGroupOneKey() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+    sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+    sublist.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+    expected.add(sublist);
+
+    assertListEquals(expected, outputs);
+  }
+
+  // sort and group multiple keys
+  @Test
+  public void testMultiSortAndGroup1() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+    expected.add(sublist1);
+
+    final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    expected.add(sublist2);
+
+    assertListEquals(expected, outputs);
+  }
+
+  // sort and group multiple keys that are out-of-order to start.
+  @Test
+  public void testMultiSortAndGroup2() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+    expected.add(sublist1);
+
+    final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    expected.add(sublist2);
+
+    assertListEquals(expected, outputs);
+  }
+
+  @Test
   public void testConfiguration() throws IOException {
     final Configuration conf = new Configuration();
     conf.set("TestKey", "TestValue");
@@ -274,13 +366,14 @@
       protected void reduce(final Text key,
           final Iterable<LongWritable> values, final Context context)
           throws IOException, InterruptedException {
+        Text outKey = new Text(key);
         long outputValue = 0;
         int count = 0;
         for (final LongWritable value : values) {
           outputValue |= (value.get() << (count++ * 8));
         }
 
-        context.write(key, new LongWritable(outputValue));
+        context.write(outKey, new LongWritable(outputValue));
       }
     });
 
@@ -514,8 +607,8 @@
       .withInput(new Text("B1"),new LongWritable(1L))
       .withInput(new Text("B2"),new LongWritable(1L))
       .withInput(new Text("C1"),new LongWritable(1L))
-      .withOutput(new Text("A1"),new LongWritable(2L))
-      .withOutput(new Text("B1"),new LongWritable(2L))
+      .withOutput(new Text("A2"),new LongWritable(2L))
+      .withOutput(new Text("B2"),new LongWritable(2L))
       .withOutput(new Text("C1"),new LongWritable(1L))
       .withKeyGroupingComparator(new FirstCharComparator())
       .runTest(false);
@@ -553,14 +646,14 @@
       .withInput(new TestWritable("A2"), new Text("A2"))
       .withInput(new TestWritable("A3"), new Text("A3"))
       .withKeyGroupingComparator(new TestWritable.SingleGroupComparator())
-      // TODO: these output keys are incorrect because of MRUNIT-129 
-      .withOutput(new TestWritable("A3"), new Text("A3"))
-      .withOutput(new TestWritable("A3"), new Text("A2"))
-      .withOutput(new TestWritable("A3"), new Text("A1"))
-      //the following are the actual correct outputs
+      // these output keys are incorrect because of MRUNIT-129
       //.withOutput(new TestWritable("A3"), new Text("A3"))
-      //.withOutput(new TestWritable("A2"), new Text("A2"))
-      //.withOutput(new TestWritable("A1"), new Text("A1"))
+      //.withOutput(new TestWritable("A3"), new Text("A2"))
+      //.withOutput(new TestWritable("A3"), new Text("A1"))
+      //the following are the actual correct outputs
+      .withOutput(new TestWritable("A3"), new Text("A3"))
+      .withOutput(new TestWritable("A2"), new Text("A2"))
+      .withOutput(new TestWritable("A1"), new Text("A1"))
       .runTest(true); //ordering is important
   }
 
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
index 0366bbd..0c83edf 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
@@ -58,11 +58,13 @@
       .none();
   private Reducer<Text, LongWritable, Text, LongWritable> reducer;
   private ReduceDriver<Text, LongWritable, Text, LongWritable> driver;
+  private ReduceFeeder<Text, LongWritable> reduceFeeder;
 
   @Before
   public void setUp() throws Exception {
     reducer = new LongSumReducer<Text>();
     driver = ReduceDriver.newReduceDriver(reducer);
+    reduceFeeder = new ReduceFeeder<Text, LongWritable>(driver.getConfiguration());
   }
 
   @Test
@@ -248,7 +250,20 @@
 
     driver.withAll(inputs).withAllOutput(expected).runTest();
   }
-  
+
+  @Test
+  public void testAddAllElements() throws IOException {
+    final List<Pair<Text, LongWritable>> input = new ArrayList<Pair<Text, LongWritable>>();
+    input.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(IN_A)));
+    input.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(IN_B)));
+
+    final List<Pair<Text, LongWritable>> expected = new ArrayList<Pair<Text, LongWritable>>();
+    expected.add(new Pair<Text, LongWritable>(new Text("foo"),
+        new LongWritable(OUT_VAL)));
+
+    driver.withAllElements(reduceFeeder.sortAndGroup(input)).withAllOutput(expected).runTest();
+  }
+
   @Test
   public void testNoInput() throws IOException {
     driver = ReduceDriver.newReduceDriver();
@@ -309,13 +324,14 @@
   public void testWithCounter() throws IOException {
     final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
         .newReduceDriver();
+    ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
 
     final LinkedList<Text> values = new LinkedList<Text>();
     values.add(new Text("a"));
     values.add(new Text("b"));
 
     driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
-        .withInput(new Text("hie"), values)
+        .withInput(reduceFeeder.updateInput(new Text("hie"), values))
         .withCounter(ReducerWithCounters.Counters.COUNT, 1)
         .withCounter(ReducerWithCounters.Counters.SUM, 2)
         .withCounter("category", "count", 1).withCounter("category", "sum", 2)
@@ -326,13 +342,15 @@
   public void testWithCounterAndNoneMissing() throws IOException {
     final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
         .newReduceDriver();
+    ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
 
     final LinkedList<Text> values = new LinkedList<Text>();
     values.add(new Text("a"));
     values.add(new Text("b"));
 
     driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
-        .withInput(new Text("hie"), values).withStrictCounterChecking()
+        .withInput(reduceFeeder.updateInput(new Text("hie"), values))
+        .withStrictCounterChecking()
         .withCounter(ReducerWithCounters.Counters.COUNT, 1)
         .withCounter(ReducerWithCounters.Counters.SUM, 2)
         .withCounter("category", "count", 1).withCounter("category", "sum", 2)
@@ -343,6 +361,7 @@
   public void testWithCounterAndEnumCounterMissing() throws IOException {
     final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
         .newReduceDriver();
+    ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
 
     thrown
         .expectAssertionErrorMessage("1 Error(s): (Actual counter ("
@@ -354,7 +373,8 @@
     values.add(new Text("b"));
 
     driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
-        .withInput(new Text("hie"), values).withStrictCounterChecking()
+        .withInput(reduceFeeder.updateInput(new Text("hie"), values))
+        .withStrictCounterChecking()
         .withCounter(ReducerWithCounters.Counters.SUM, 2)
         .withCounter("category", "count", 1).withCounter("category", "sum", 2)
         .runTest();
@@ -364,6 +384,7 @@
   public void testWithCounterAndStringCounterMissing() throws IOException {
     final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
         .newReduceDriver();
+    ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
 
     thrown.expectAssertionErrorMessage("1 Error(s): (Actual counter ("
         + "\"category\",\"count\")" + " was not found in expected counters");
@@ -373,7 +394,8 @@
     values.add(new Text("b"));
 
     driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
-        .withInput(new Text("hie"), values).withStrictCounterChecking()
+        .withInput(reduceFeeder.updateInput(new Text("hie"), values))
+        .withStrictCounterChecking()
         .withCounter(ReducerWithCounters.Counters.COUNT, 1)
         .withCounter(ReducerWithCounters.Counters.SUM, 2)
         .withCounter("category", "sum", 2).runTest();
@@ -499,8 +521,9 @@
   public void testWithTaskAttemptUse() throws IOException {
     final ReduceDriver<Text,NullWritable,Text,NullWritable> driver 
       = ReduceDriver.newReduceDriver(new TaskAttemptReducer());
-    driver.withInput(new Text("anything"), Arrays.asList(NullWritable.get())).withOutput(
+    ReduceFeeder<Text, NullWritable> reduceFeeder = new ReduceFeeder<Text, NullWritable>(driver.getConfiguration());
+
+    driver.withInput(reduceFeeder.updateInput(new Text("anything"), Arrays.asList(NullWritable.get()))).withOutput(
         new Text("attempt__0000_r_000000_0"), NullWritable.get()).runTest();
   }
-
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceFeeder.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceFeeder.java
new file mode 100644
index 0000000..1ff49c9
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceFeeder.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import static org.apache.hadoop.mrunit.ExtendedAssert.assertListEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceFeeder;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReduceFeeder {
+  ReduceFeeder<Text, Text> feeder;
+  ReduceDriver<Text, Text, Text, Text> driver;
+
+  Comparator<Text> firstCharComparator = new Comparator<Text>(){
+    @Override
+    public int compare(Text arg0, Text arg1) {
+      return new Integer(arg0.charAt(0)).compareTo(new Integer(arg1.charAt(0)));
+    }
+  };
+
+  Comparator<Text> secondCharComparator = new Comparator<Text>(){
+    @Override
+    public int compare(Text arg0, Text arg1) {
+      return new Integer(arg0.charAt(1)).compareTo(new Integer(arg1.charAt(1)));
+    }
+  };
+
+  @Before
+  public void setUp() {
+	  driver = ReduceDriver.newReduceDriver();
+	  feeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
+  }
+
+  @Test
+  public void testEmptySortAndGroup() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+    assertEquals(0, outputs.size());
+  }
+
+  // just sort and group a single (k, v) pair
+  @Test
+  public void testSingleSortAndGroup() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+    expected.add(sublist);
+
+    assertListEquals(expected, outputs);
+  }
+
+  // sort and group multiple values from the same key.
+  @Test
+  public void testSortAndGroupOneKey() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+    sublist.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+    expected.add(sublist);
+
+    assertListEquals(expected, outputs);
+  }
+
+  // sort and group multiple keys
+  @Test
+  public void testMultiSortAndGroup1() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+    expected.add(sublist1);
+
+    final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    expected.add(sublist2);
+
+    assertListEquals(expected, outputs);
+  }
+
+  // sort and group multiple keys that are out-of-order to start.
+  @Test
+  public void testMultiSortAndGroup2() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+    sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+    expected.add(sublist1);
+
+    final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+    sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+    expected.add(sublist2);
+
+    assertListEquals(expected, outputs);
+  }
+
+  @Test
+  public void testSortAndGroupWithOneComparator() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+    inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+    inputs.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+    inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v4")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs, secondCharComparator);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist1.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+    sublist1.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+    expected.add(sublist1);
+
+    final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist2.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+    sublist2.add(new Pair<Text, Text>(new Text("a2"), new Text("v4")));
+    expected.add(sublist2);
+
+    assertListEquals(expected, outputs);
+  }
+
+  @Test
+  public void testSortAndGroupWithTwoComparators() {
+    final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+    inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v5")));
+    inputs.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+    inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+    inputs.add(new Pair<Text, Text>(new Text("a3"), new Text("v4")));
+
+    final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs, secondCharComparator, firstCharComparator);
+
+    final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist1.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+    expected.add(sublist1);
+
+    final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist2.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+    expected.add(sublist2);
+
+    final KeyValueReuseList<Text, Text> sublist3 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    sublist3.add(new Pair<Text, Text>(new Text("a2"), new Text("v5")));
+    sublist3.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+    sublist3.add(new Pair<Text, Text>(new Text("a3"), new Text("v4")));
+    expected.add(sublist3);
+
+    assertListEquals(expected, outputs);
+  }
+
+  @Test
+  public void testUpdateInput() {
+    List<Text> values = new ArrayList<Text>();
+    values.add(new Text("v1"));
+    values.add(new Text("v2"));
+    values.add(new Text("v3"));
+    Text key = new Text("k");
+    Pair<Text, List<Text>> oldInput = new Pair(key, values);
+    KeyValueReuseList<Text, Text> updated = feeder.updateInput(oldInput);
+
+    KeyValueReuseList<Text, Text> expected = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    for(Text value : values){
+      expected.add(new Pair(key, value));
+    }
+
+    assertListEquals(expected, updated);
+  }
+
+  @Test
+  public void testUpdateAll() {
+    List<Text> values = new ArrayList<Text>();
+    values.add(new Text("v1"));
+    values.add(new Text("v2"));
+    values.add(new Text("v3"));
+    Text key = new Text("k");
+    List<Pair<Text, List<Text>>> oldInputs = new ArrayList<Pair<Text, List<Text>>>();
+    oldInputs.add(new Pair(key, values));
+    List<KeyValueReuseList<Text, Text>> updated = feeder.updateAll(oldInputs);
+
+    KeyValueReuseList<Text, Text> kvList = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+    for(Text value : values){
+      kvList.add(new Pair(key, value));
+    }
+    List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+    expected.add(kvList);
+
+    assertListEquals(expected, updated);
+  }
+}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java
index 0809126..f1c8f21 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java
@@ -65,17 +65,17 @@
     final TestReducer reducer = new TestReducer();
     final ReduceDriver<Text, LongWritable, Text, LongWritable> driver = ReduceDriver
         .newReduceDriver();
+    final ReduceFeeder<Text, LongWritable> feeder = new ReduceFeeder<Text, LongWritable>(driver.getConfiguration());
     driver.setReducer(reducer);
     final List<LongWritable> values = new ArrayList<LongWritable>();
     values.add(new LongWritable(1));
     values.add(new LongWritable(1));
     values.add(new LongWritable(1));
     values.add(new LongWritable(1));
-    driver.withInput(new Text("foo"), values);
+    driver.withInput(feeder.updateInput(new Text("foo"), values));
     driver.withOutput(new Text("foo"), new LongWritable(4));
     driver.runTest();
     assertTrue(reducer.instanceCheckOccurred);
     assertFalse(reducer.instanceCheckFailed);
   }
-
 }
diff --git a/src/test/java/org/apache/hadoop/mrunit/types/TestKeyValueReuseList.java b/src/test/java/org/apache/hadoop/mrunit/types/TestKeyValueReuseList.java
new file mode 100644
index 0000000..c2566ce
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/mrunit/types/TestKeyValueReuseList.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.types;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceFeeder;
+import org.junit.Before;
+import org.junit.Test;
+import static org.apache.hadoop.mrunit.ExtendedAssert.*;
+
+public class TestKeyValueReuseList {
+  KeyValueReuseList<Text, Text> kvList;
+  ReduceDriver<Text, Text, Text, Text> driver;
+
+  @Before
+  public void setUp() {
+    driver = ReduceDriver.newReduceDriver();
+    ReduceFeeder<Text, Text> feeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
+
+    List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+    inputs.add(new Pair<Text, Text>(new Text("k1"), new Text("v1")));
+    inputs.add(new Pair<Text, Text>(new Text("k2"), new Text("v2")));
+    inputs.add(new Pair<Text, Text>(new Text("k3"), new Text("v3")));
+    inputs.add(new Pair<Text, Text>(new Text("k4"), new Text("v4")));
+    kvList = feeder.sortAndGroup(inputs, new Comparator<Text>(){
+      @Override
+      public int compare(Text arg0, Text arg1) {
+        return 0;
+      }
+    }).get(0);
+  }
+
+  @Test
+  public void testValueIterator() {
+    Iterator<Text> iterator = kvList.valueIterator();
+
+    Text oldKey = null, oldValue = null;
+    for(int i = 1; i<5; i++){
+      Text currentValue = iterator.next();
+      Text currentKey = kvList.getCurrentKey();
+      Assert.assertEquals("v"+i, currentValue.toString());
+      Assert.assertEquals("k"+i, currentKey.toString());
+      if (oldKey != null && oldValue != null){
+        Assert.assertTrue( oldKey == currentKey );
+        Assert.assertTrue( oldValue == currentValue );
+      }
+      oldKey = currentKey;
+      oldValue = currentValue;
+    }
+  }
+
+  @Test
+  public void testClone() {
+    KeyValueReuseList<Text, Text> clone = kvList.clone(driver.getConfiguration());
+    Assert.assertTrue(kvList != clone);
+    assertListEquals(kvList, clone);
+
+    for (int i = 0 ; i < kvList.size() ; i++){
+      Assert.assertTrue(kvList.get(i) != clone.get(i));
+    }
+  }
+}