MRUNIT-129: Key object re-use in Reducer is inconsistent with MapReduce behaviour (new API)
diff --git a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
index eea0c60..04209c2 100644
--- a/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/TestDriver.java
@@ -923,4 +923,9 @@
final StringBuilder sb) {
StringUtils.formatValueList(values, sb);
}
+
+ protected static <KEYIN, VALUEIN> void formatPairList(final List<Pair<KEYIN,VALUEIN>> pairs,
+ final StringBuilder sb) {
+ StringUtils.formatPairList(pairs, sb);
+ }
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
index 0bdfd6a..d753cff 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/mapreduce/MockReduceContextWrapper.java
@@ -29,10 +29,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mrunit.internal.output.MockOutputCreator;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -51,14 +50,24 @@
protected static final Log LOG = LogFactory
.getLog(MockReduceContextWrapper.class);
- protected final List<Pair<KEYIN, List<VALUEIN>>> inputs;
+
+ protected final List<KeyValueReuseList<KEYIN, VALUEIN>> inputs;
+
protected final ReduceDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver;
-
- protected Pair<KEYIN, List<VALUEIN>> currentKeyValue;
-
+
+ protected KeyValueReuseList<KEYIN, VALUEIN> currentKeyValue;
+
+ /**
+ * New constructor with new input format.
+ * @param configuration
+ * @param inputs
+ * @param mockOutputCreator
+ * @param driver
+ * @param unused this parameter is here only to avoid erasure collision with deprecated constructor.
+ */
public MockReduceContextWrapper(
final Configuration configuration,
- final List<Pair<KEYIN, List<VALUEIN>>> inputs,
+ final List<KeyValueReuseList<KEYIN, VALUEIN>> inputs,
final MockOutputCreator<KEYOUT, VALUEOUT> mockOutputCreator,
final ReduceDriver<KEYIN, VALUEIN, KEYOUT, VALUEOUT> driver) {
super(configuration, mockOutputCreator);
@@ -72,7 +81,7 @@
protected Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context create() {
final Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context = mock(org.apache.hadoop.mapreduce.Reducer.Context.class);
-
+
createCommon(context, driver, mockOutputCreator);
try {
/*
@@ -94,13 +103,13 @@
when(context.getCurrentKey()).thenAnswer(new Answer<KEYIN>() {
@Override
public KEYIN answer(final InvocationOnMock invocation) {
- return currentKeyValue.getFirst();
+ return currentKeyValue.getCurrentKey();
}
});
when(context.getValues()).thenAnswer(new Answer<Iterable<VALUEIN>>() {
@Override
public Iterable<VALUEIN> answer(final InvocationOnMock invocation) {
- return makeOneUseIterator(currentKeyValue.getSecond().iterator());
+ return makeOneUseIterator(currentKeyValue.valueIterator());
}
});
when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>(){
diff --git a/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java b/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java
index ceef2f4..ed76f28 100644
--- a/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java
+++ b/src/main/java/org/apache/hadoop/mrunit/internal/util/StringUtils.java
@@ -88,4 +88,22 @@
sb.append(")");
}
+ /**
+ * Transform a list with elements (a1,a2) and (b1,b2) into a string "[(a1,a2),(b1,b2)]".
+ */
+ public static <K, V> void formatPairList(final List<Pair<K, V>> pairs,
+ final StringBuilder sb) {
+ sb.append("[");
+
+ boolean first = true;
+ for (final Pair<K, V> p : pairs) {
+ if (!first) {
+ sb.append(", ");
+ }
+ first = false;
+ sb.append("(" + p.getFirst() + ", " + p.getSecond() + ")");
+ }
+
+ sb.append("]");
+ }
}
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
index 78941c2..7ead0aa 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/MapReduceDriver.java
@@ -21,10 +21,12 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
@@ -32,6 +34,7 @@
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.MapReduceDriverBase;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -207,7 +210,7 @@
*/
private class ReducePhaseRunner<OUTKEY, OUTVAL> {
private List<Pair<OUTKEY, OUTVAL>> runReduce(
- final List<Pair<K2, List<V2>>> inputs,
+ final List<KeyValueReuseList<K2, V2>> inputs,
final Reducer<K2, V2, OUTKEY, OUTVAL> reducer) throws IOException {
final List<Pair<OUTKEY, OUTVAL>> reduceOutputs = new ArrayList<Pair<OUTKEY, OUTVAL>>();
@@ -215,16 +218,16 @@
if (!inputs.isEmpty()) {
if (LOG.isDebugEnabled()) {
final StringBuilder sb = new StringBuilder();
- for (Pair<K2, List<V2>> input : inputs) {
- formatValueList(input.getSecond(), sb);
- LOG.debug("Reducing input (" + input.getFirst() + ", " + sb + ")");
+ for (List<Pair<K2, V2>> input : inputs) {
+ formatPairList(input, sb);
+ LOG.debug("Reducing input " + sb);
sb.delete(0, sb.length());
}
}
final ReduceDriver<K2, V2, OUTKEY, OUTVAL> reduceDriver = ReduceDriver
.newReduceDriver(reducer).withCounters(getCounters())
- .withConfiguration(getConfiguration()).withAll(inputs);
+ .withConfiguration(getConfiguration()).withAllElements(inputs);
if (getOutputSerializationConfiguration() != null) {
reduceDriver
@@ -242,6 +245,25 @@
}
}
+ protected List<KeyValueReuseList<K2,V2>> sortAndGroup(final List<Pair<K2, V2>> mapOutputs){
+ if(mapOutputs.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ if (keyValueOrderComparator == null || keyGroupComparator == null){
+ JobConf conf = new JobConf(getConfiguration());
+ conf.setMapOutputKeyClass(mapOutputs.get(0).getFirst().getClass());
+ if (keyGroupComparator == null){
+ keyGroupComparator = conf.getOutputValueGroupingComparator();
+ }
+ if (keyValueOrderComparator == null) {
+ keyValueOrderComparator = conf.getOutputKeyComparator();
+ }
+ }
+ ReduceFeeder<K2,V2> reduceFeeder = new ReduceFeeder<K2,V2>(getConfiguration());
+ return reduceFeeder.sortAndGroup(mapOutputs, keyValueOrderComparator, keyGroupComparator);
+ }
+
@Override
public List<Pair<K3, V3>> run() throws IOException {
try {
@@ -258,11 +280,11 @@
// with the result of the combiner.
LOG.debug("Starting combine phase with combiner: " + myCombiner);
mapOutputs = new ReducePhaseRunner<K2, V2>().runReduce(
- shuffle(mapOutputs), myCombiner);
+ sortAndGroup(mapOutputs), myCombiner);
}
// Run the reduce phase.
LOG.debug("Starting reduce phase with reducer: " + myReducer);
- return new ReducePhaseRunner<K3, V3>().runReduce(shuffle(mapOutputs),
+ return new ReducePhaseRunner<K3, V3>().runReduce(sortAndGroup(mapOutputs),
myReducer);
} finally {
cleanupDistributedCache();
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
index c7a15b7..aeb460e 100644
--- a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
@@ -21,6 +21,7 @@
import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.*;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -33,6 +34,7 @@
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
import org.apache.hadoop.mrunit.internal.mapreduce.ContextDriver;
import org.apache.hadoop.mrunit.internal.mapreduce.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.apache.hadoop.mrunit.types.Pair;
/**
@@ -47,9 +49,11 @@
ReduceDriverBase<K1, V1, K2, V2, ReduceDriver<K1, V1, K2, V2>> implements
ContextDriver {
+ protected List<KeyValueReuseList<K1, V1>> groupedInputs = new ArrayList<KeyValueReuseList<K1, V1>>();
+
public static final Log LOG = LogFactory.getLog(ReduceDriver.class);
- private Reducer<K1, V1, K2, V2> myReducer;
+ protected Reducer<K1, V1, K2, V2> myReducer;
private Counters counters;
/**
* Context creator, do not use directly, always use the
@@ -58,6 +62,82 @@
*/
private MockReduceContextWrapper<K1, V1, K2, V2> wrapper;
+ public List<Pair<K1,V1>> getInputs(final K1 firstKey) {
+ for (KeyValueReuseList<K1, V1> p : groupedInputs) {
+ if(p.getCurrentKey().equals(firstKey)){
+ return p;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Clears the input to be sent to the Reducer
+ */
+ @Override
+ public void clearInput() {
+ super.clearInput();
+ groupedInputs.clear();
+ }
+
+ /**
+ * Add input (K*, V*) to send to the Reducer
+ *
+ * @param key
+ * The key too add
+ * @param values
+ * The value to add
+ */
+ public void addInput(final KeyValueReuseList<K1, V1> input) {
+ groupedInputs.add(input.clone(getConfiguration()));
+ }
+
+ /**
+ * Identical to addInput() but returns self for fluent programming style
+ *
+ * @param input
+ * @return this
+ */
+ public ReduceDriver<K1, V1, K2, V2> withInput(final KeyValueReuseList<K1, V1> input) {
+ addInput(input);
+ return this;
+ }
+
+ /**
+ * Identical to addAllElements() but returns self for fluent programming style
+ *
+ * @param inputs
+ * @return this
+ */
+ public ReduceDriver<K1, V1, K2, V2> withAllElements(
+ // This method is called withAllElements to avoid erasure conflict with withAll method from ReduceDriverBase.
+ final List<KeyValueReuseList<K1, V1>> inputs) {
+ addAllElements(inputs);
+ return this;
+ }
+
+ /**
+ * Adds input to send to the Reducer
+ *
+ * @param inputs
+ * list of (K*, V*) pairs
+ */
+ public void addAllElements(final List<KeyValueReuseList<K1, V1>> inputs) {
+ // This method is called addAllElements to avoid erasure conflict with addAll method from ReduceDriverBase.
+ for (KeyValueReuseList<K1, V1> input : inputs) {
+ addInput(input);
+ }
+ }
+
+ @Override
+ protected void printPreTestDebugLog() {
+ final StringBuilder sb = new StringBuilder();
+ for (List<Pair<K1, V1>> input : groupedInputs) {
+ formatPairList(input, sb);
+ LOG.debug("Reducing input " + sb);
+ sb.delete(0, sb.length());
+ }
+ }
public ReduceDriver(final Reducer<K1, V1, K2, V2> r) {
this();
@@ -134,6 +214,30 @@
return this;
}
+ /**
+ * Handle inputKey and inputValues and inputs for backwards compatibility.
+ */
+ @Override
+ protected void preRunChecks(Object reducer) {
+ if (inputKey != null && !getInputValues().isEmpty()) {
+ clearInput();
+ addInput(new ReduceFeeder<K1, V1>(getConfiguration()).updateInput(inputKey, getInputValues()));
+ }
+
+ if (inputs != null && !inputs.isEmpty()){
+ groupedInputs.clear();
+ groupedInputs = new ReduceFeeder<K1, V1>(getConfiguration()).updateAll(inputs);
+ }
+
+ if (groupedInputs == null || groupedInputs.isEmpty()) {
+ throw new IllegalStateException("No input was provided");
+ }
+
+ if (reducer == null) {
+ throw new IllegalStateException("No Reducer class was provided");
+ }
+ }
+
@Override
public List<Pair<K2, V2>> run() throws IOException {
try {
@@ -157,11 +261,11 @@
private MockReduceContextWrapper<K1, V1, K2, V2> getContextWrapper() {
if(wrapper == null) {
wrapper = new MockReduceContextWrapper<K1, V1, K2, V2>(
- getConfiguration(), inputs, mockOutputCreator, this);
+ getConfiguration(), groupedInputs, mockOutputCreator, this);
}
return wrapper;
}
-
+
/**
* <p>Obtain Context object for furthering mocking with Mockito.
* For example, causing write() to throw an exception:</p>
diff --git a/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceFeeder.java b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceFeeder.java
new file mode 100644
index 0000000..c55c624
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/mapreduce/ReduceFeeder.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mrunit.internal.io.Serialization;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
+import org.apache.hadoop.mrunit.types.Pair;
+
+/**
+ * This Class provides some methods to get inputs compatible with new API reducer.
+ *
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ */
+public class ReduceFeeder<K, V> {
+ private final Serialization serialization;
+ private final Configuration conf;
+
+ /**
+ * Constructor
+ *
+ * @param conf The driver's configuration.
+ */
+ public ReduceFeeder(Configuration conf){
+ this.conf = conf;
+ serialization = new Serialization(conf);
+ }
+
+ /**
+ * This method takes the outputs from a mapper and return the
+ * corresponding reducer input where keys have been sorted and
+ * grouped according to given comparators.
+ *
+ * If at least one comparator is null, Keys have to implements Comparable<K>.
+ *
+ * @param mapOutputs The outputs from mapper
+ * @param keyValueOrderComparator The comparator for ordering keys.
+ * @param keyGroupComparator The comparator for grouping keys
+ * @return key values sorted and grouped for the reducer
+ */
+ public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs,
+ final Comparator<K> keyValueOrderComparator,
+ final Comparator<K> keyGroupComparator){
+ if(mapOutputs.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (keyValueOrderComparator != null){
+ Collections.sort(mapOutputs, new Comparator<Pair<K,V>>(){
+ @Override
+ public int compare(Pair<K, V> o1, Pair<K, V> o2) {
+ return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
+ }
+ });
+ } else {
+ Collections.sort(mapOutputs);
+ }
+
+ List<KeyValueReuseList<K, V>> groupedInputs = new ArrayList<KeyValueReuseList<K, V>>();
+
+ K currentKey = null;
+ KeyValueReuseList<K, V> currentEntries = null;
+ for(Pair<K,V> p : mapOutputs){
+ if (currentKey == null
+ || (keyGroupComparator != null && keyGroupComparator.compare(currentKey, p.getFirst()) != 0)
+ || (keyGroupComparator == null && ((Comparable<K>)currentKey).compareTo(p.getFirst()) != 0)) {
+ currentKey = p.getFirst();
+ currentEntries = new KeyValueReuseList<K, V>(serialization.copy(p.getFirst()), serialization.copy(p.getSecond()), conf);
+ groupedInputs.add(currentEntries);
+ }
+ currentEntries.add(p);
+ }
+ return groupedInputs;
+ }
+
+ /**
+ * This method takes the outputs from a mapper and return the
+ * corresponding reducer input where keys have been sorted and
+ * grouped according to given comparator.
+ *
+ * If the comparator is null, Keys have to implements Comparable<K>.
+ *
+ * @param mapOutputs The outputs from mapper
+ * @param keyOrderAndGroupComparator The comparator for grouping and ordering keys
+ * @return key values sorted and grouped for the reducer
+ */
+ public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs,
+ final Comparator<K> keyOrderAndGroupComparator){
+ return sortAndGroup(mapOutputs, keyOrderAndGroupComparator, keyOrderAndGroupComparator);
+ }
+
+ /**
+ * This method takes the outputs from a mapper and return the
+ * corresponding reducer input where keys have been sorted and
+ * grouped according to their natural order.
+ *
+ * Keys have to implements Comparable<K>.
+ *
+ * @param mapOutputs The outputs from mapper
+ * @return key values sorted and grouped for the reducer
+ */
+ public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs){
+ return sortAndGroup(mapOutputs, null, null);
+ }
+
+ /**
+ * This method takes a list of (k, v*) and return a list of (k*, v*) where k have been duplicated.
+ *
+ * @param inputs The list of key values pair with one key and multiples values
+ * @return Reducer inputs compatible with the new API
+ */
+ public List<KeyValueReuseList<K, V>> updateAll(final List<Pair<K, List<V>>> inputs) {
+ List<KeyValueReuseList<K, V>> transformedInputs = new ArrayList<KeyValueReuseList<K, V>>();
+
+ for(Pair<K, List<V>> keyValues : inputs){
+ if (!keyValues.getSecond().isEmpty()){
+ transformedInputs.add(updateInput(keyValues));
+ }
+ }
+ return transformedInputs;
+ }
+
+ /**
+ * This method takes a (k, v*) input and return a (k*, v*) where k have been duplicated.
+ *
+ * @param input The key values pair with one key for multiple values
+ * @return Reducer input compatible with the new API
+ */
+ public KeyValueReuseList<K, V> updateInput(final Pair<K, List<V>> input){
+ return updateInput(input.getFirst(), input.getSecond());
+ }
+
+ /**
+ * This method takes a (k, v*) input and return a (k*, v*) where k have been duplicated.
+ *
+ * @param input The key values pair with one key for multiple values
+ * @return Reducer input compatible with the new API
+ */
+ public KeyValueReuseList<K, V> updateInput(final K key, List<V> values){
+ if (values.isEmpty()){
+ return new KeyValueReuseList<K, V>(serialization.copy(key), null, conf);
+ }
+ KeyValueReuseList<K, V> entry =
+ new KeyValueReuseList<K, V>(serialization.copy(key),serialization.copy(values.get(0)), conf);
+ for(V value : values){
+ entry.add(new Pair<K, V>(serialization.copy(key), serialization.copy(value)));
+ }
+ return entry;
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/mrunit/types/KeyValueReuseList.java b/src/main/java/org/apache/hadoop/mrunit/types/KeyValueReuseList.java
new file mode 100644
index 0000000..5aa56ad
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/mrunit/types/KeyValueReuseList.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.types;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mrunit.internal.io.Serialization;
+
+/**
+ * A List of Pair<K, V> to store a reducer input (k, v)* with keys equivalent for a grouping comparator.
+ *
+ * @param <K> The type of keys
+ * @param <V> The type of Values
+ */
+public class KeyValueReuseList<K, V> extends ArrayList<Pair<K,V>>{
+ private static final long serialVersionUID = 1192028416166671770L;
+ private final K keyContainer;
+ private final V valueContainer;
+ private final Serialization serialization;
+
+ public KeyValueReuseList(K keyContainer, V valueContainer, Configuration conf){
+ super();
+ serialization = new Serialization(conf);
+ this.keyContainer = keyContainer;
+ this.valueContainer = valueContainer;
+ }
+
+ public K getCurrentKey() {
+ return keyContainer;
+ }
+
+ public KeyValueReuseList<K, V> clone(Configuration conf){
+ K key = serialization.copy(keyContainer);
+ V value = serialization.copy(valueContainer);
+ KeyValueReuseList<K, V> clone = new KeyValueReuseList<K, V>(key, value, conf);
+ for (Pair<K, V> pair : this) {
+ clone.add(new Pair<K, V>(serialization.copy(pair.getFirst()), serialization.copy(pair.getSecond())));
+ }
+ return clone;
+ }
+
+ public Iterator<V> valueIterator() {
+ return new Iterator<V>() {
+ private K key = keyContainer;
+ private V value = valueContainer;
+ private final Iterator<Pair<K, V>> delegate = iterator();
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public V next() {
+ Pair<K, V> p = delegate.next();
+ key = serialization.copy(p.getFirst(), key);
+ value = serialization.copy(p.getSecond(), value);
+ return value;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java b/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java
index 27f1d67..070a743 100644
--- a/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java
+++ b/src/test/java/org/apache/hadoop/mrunit/internal/util/TestStringUtils.java
@@ -224,4 +224,35 @@
StringUtils.formatValueList(Arrays.asList("first", "second", "third"), stringBuilder);
assertEquals("previous message (first, second, third)", stringBuilder.toString());
}
+
+ /**
+ * Test method for {@link StringUtils#formatPairList(List, StringBuilder)}
+ */
+ @Test
+ public void shouldFormatPairListWhenEmpty() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("previous message ");
+ List<Pair<String, String>> list = new ArrayList<Pair<String, String>>();
+ StringUtils.formatPairList(list, stringBuilder);
+ assertEquals("previous message []", stringBuilder.toString());
+ }
+
+ @Test
+ public void shouldFormatPairListWithSingleElement() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("previous message ");
+ StringUtils.formatPairList(Arrays.asList(new Pair<String, String>("key", "value")), stringBuilder);
+ assertEquals("previous message [(key, value)]", stringBuilder.toString());
+ }
+
+ @Test
+ public void shouldFormatPairListWithManyElement() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("previous message ");
+ StringUtils.formatPairList(Arrays.asList(
+ new Pair<String, String>("first_key", "first_value"),
+ new Pair<String, String>("second_key", "second_value"),
+ new Pair<String, String>("third_key", "third_value")), stringBuilder);
+ assertEquals("previous message [(first_key, first_value), (second_key, second_value), (third_key, third_value)]", stringBuilder.toString());
+ }
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
index 5ce285d..5d89c38 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestMapReduceDriver.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.mrunit.TestMapReduceDriver.SecondCharComparator;
import org.apache.hadoop.mrunit.mapreduce.TestMapDriver.ConfigurationMapper;
import org.apache.hadoop.mrunit.mapreduce.TestReduceDriver.ConfigurationReducer;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.mrunit.types.TestWritable;
import org.junit.Before;
@@ -248,6 +249,97 @@
}
@Test
+ public void testEmptySortAndGroup() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+ assertEquals(0, outputs.size());
+ }
+
+ // just sort and group a single (k, v) pair
+ @Test
+ public void testSingleSortAndGroup() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+ sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+ expected.add(sublist);
+
+ assertListEquals(expected, outputs);
+ }
+
+ // sort and group multiple values from the same key.
+ @Test
+ public void testSortAndGroupOneKey() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+ sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+ sublist.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+ expected.add(sublist);
+
+ assertListEquals(expected, outputs);
+ }
+
+ // sort and group multiple keys
+ @Test
+ public void testMultiSortAndGroup1() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+ expected.add(sublist1);
+
+ final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ expected.add(sublist2);
+
+ assertListEquals(expected, outputs);
+ }
+
+ // sort and group multiple keys that are out-of-order to start.
+ @Test
+ public void testMultiSortAndGroup2() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = driver2.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+ expected.add(sublist1);
+
+ final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver2.getConfiguration());
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("z")));
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ expected.add(sublist2);
+
+ assertListEquals(expected, outputs);
+ }
+
+ @Test
public void testConfiguration() throws IOException {
final Configuration conf = new Configuration();
conf.set("TestKey", "TestValue");
@@ -274,13 +366,14 @@
protected void reduce(final Text key,
final Iterable<LongWritable> values, final Context context)
throws IOException, InterruptedException {
+ Text outKey = new Text(key);
long outputValue = 0;
int count = 0;
for (final LongWritable value : values) {
outputValue |= (value.get() << (count++ * 8));
}
- context.write(key, new LongWritable(outputValue));
+ context.write(outKey, new LongWritable(outputValue));
}
});
@@ -514,8 +607,8 @@
.withInput(new Text("B1"),new LongWritable(1L))
.withInput(new Text("B2"),new LongWritable(1L))
.withInput(new Text("C1"),new LongWritable(1L))
- .withOutput(new Text("A1"),new LongWritable(2L))
- .withOutput(new Text("B1"),new LongWritable(2L))
+ .withOutput(new Text("A2"),new LongWritable(2L))
+ .withOutput(new Text("B2"),new LongWritable(2L))
.withOutput(new Text("C1"),new LongWritable(1L))
.withKeyGroupingComparator(new FirstCharComparator())
.runTest(false);
@@ -553,14 +646,14 @@
.withInput(new TestWritable("A2"), new Text("A2"))
.withInput(new TestWritable("A3"), new Text("A3"))
.withKeyGroupingComparator(new TestWritable.SingleGroupComparator())
- // TODO: these output keys are incorrect because of MRUNIT-129
- .withOutput(new TestWritable("A3"), new Text("A3"))
- .withOutput(new TestWritable("A3"), new Text("A2"))
- .withOutput(new TestWritable("A3"), new Text("A1"))
- //the following are the actual correct outputs
+ // these output keys are incorrect because of MRUNIT-129
//.withOutput(new TestWritable("A3"), new Text("A3"))
- //.withOutput(new TestWritable("A2"), new Text("A2"))
- //.withOutput(new TestWritable("A1"), new Text("A1"))
+ //.withOutput(new TestWritable("A3"), new Text("A2"))
+ //.withOutput(new TestWritable("A3"), new Text("A1"))
+ //the following are the actual correct outputs
+ .withOutput(new TestWritable("A3"), new Text("A3"))
+ .withOutput(new TestWritable("A2"), new Text("A2"))
+ .withOutput(new TestWritable("A1"), new Text("A1"))
.runTest(true); //ordering is important
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
index 0366bbd..0c83edf 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceDriver.java
@@ -58,11 +58,13 @@
.none();
private Reducer<Text, LongWritable, Text, LongWritable> reducer;
private ReduceDriver<Text, LongWritable, Text, LongWritable> driver;
+ private ReduceFeeder<Text, LongWritable> reduceFeeder;
@Before
public void setUp() throws Exception {
reducer = new LongSumReducer<Text>();
driver = ReduceDriver.newReduceDriver(reducer);
+ reduceFeeder = new ReduceFeeder<Text, LongWritable>(driver.getConfiguration());
}
@Test
@@ -248,7 +250,20 @@
driver.withAll(inputs).withAllOutput(expected).runTest();
}
-
+
+ @Test
+ public void testAddAllElements() throws IOException {
+ final List<Pair<Text, LongWritable>> input = new ArrayList<Pair<Text, LongWritable>>();
+ input.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(IN_A)));
+ input.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(IN_B)));
+
+ final List<Pair<Text, LongWritable>> expected = new ArrayList<Pair<Text, LongWritable>>();
+ expected.add(new Pair<Text, LongWritable>(new Text("foo"),
+ new LongWritable(OUT_VAL)));
+
+ driver.withAllElements(reduceFeeder.sortAndGroup(input)).withAllOutput(expected).runTest();
+ }
+
@Test
public void testNoInput() throws IOException {
driver = ReduceDriver.newReduceDriver();
@@ -309,13 +324,14 @@
public void testWithCounter() throws IOException {
final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
.newReduceDriver();
+ ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
final LinkedList<Text> values = new LinkedList<Text>();
values.add(new Text("a"));
values.add(new Text("b"));
driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
- .withInput(new Text("hie"), values)
+ .withInput(reduceFeeder.updateInput(new Text("hie"), values))
.withCounter(ReducerWithCounters.Counters.COUNT, 1)
.withCounter(ReducerWithCounters.Counters.SUM, 2)
.withCounter("category", "count", 1).withCounter("category", "sum", 2)
@@ -326,13 +342,15 @@
public void testWithCounterAndNoneMissing() throws IOException {
final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
.newReduceDriver();
+ ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
final LinkedList<Text> values = new LinkedList<Text>();
values.add(new Text("a"));
values.add(new Text("b"));
driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
- .withInput(new Text("hie"), values).withStrictCounterChecking()
+ .withInput(reduceFeeder.updateInput(new Text("hie"), values))
+ .withStrictCounterChecking()
.withCounter(ReducerWithCounters.Counters.COUNT, 1)
.withCounter(ReducerWithCounters.Counters.SUM, 2)
.withCounter("category", "count", 1).withCounter("category", "sum", 2)
@@ -343,6 +361,7 @@
public void testWithCounterAndEnumCounterMissing() throws IOException {
final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
.newReduceDriver();
+ ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
thrown
.expectAssertionErrorMessage("1 Error(s): (Actual counter ("
@@ -354,7 +373,8 @@
values.add(new Text("b"));
driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
- .withInput(new Text("hie"), values).withStrictCounterChecking()
+ .withInput(reduceFeeder.updateInput(new Text("hie"), values))
+ .withStrictCounterChecking()
.withCounter(ReducerWithCounters.Counters.SUM, 2)
.withCounter("category", "count", 1).withCounter("category", "sum", 2)
.runTest();
@@ -364,6 +384,7 @@
public void testWithCounterAndStringCounterMissing() throws IOException {
final ReduceDriver<Text, Text, Text, Text> driver = ReduceDriver
.newReduceDriver();
+ ReduceFeeder<Text, Text> reduceFeeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
thrown.expectAssertionErrorMessage("1 Error(s): (Actual counter ("
+ "\"category\",\"count\")" + " was not found in expected counters");
@@ -373,7 +394,8 @@
values.add(new Text("b"));
driver.withReducer(new ReducerWithCounters<Text, Text, Text, Text>())
- .withInput(new Text("hie"), values).withStrictCounterChecking()
+ .withInput(reduceFeeder.updateInput(new Text("hie"), values))
+ .withStrictCounterChecking()
.withCounter(ReducerWithCounters.Counters.COUNT, 1)
.withCounter(ReducerWithCounters.Counters.SUM, 2)
.withCounter("category", "sum", 2).runTest();
@@ -499,8 +521,9 @@
public void testWithTaskAttemptUse() throws IOException {
final ReduceDriver<Text,NullWritable,Text,NullWritable> driver
= ReduceDriver.newReduceDriver(new TaskAttemptReducer());
- driver.withInput(new Text("anything"), Arrays.asList(NullWritable.get())).withOutput(
+ ReduceFeeder<Text, NullWritable> reduceFeeder = new ReduceFeeder<Text, NullWritable>(driver.getConfiguration());
+
+ driver.withInput(reduceFeeder.updateInput(new Text("anything"), Arrays.asList(NullWritable.get()))).withOutput(
new Text("attempt__0000_r_000000_0"), NullWritable.get()).runTest();
}
-
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceFeeder.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceFeeder.java
new file mode 100644
index 0000000..1ff49c9
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReduceFeeder.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce;
+
+import static org.apache.hadoop.mrunit.ExtendedAssert.assertListEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceFeeder;
+import org.apache.hadoop.mrunit.types.KeyValueReuseList;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReduceFeeder {
+ ReduceFeeder<Text, Text> feeder;
+ ReduceDriver<Text, Text, Text, Text> driver;
+
+ Comparator<Text> firstCharComparator = new Comparator<Text>(){
+ @Override
+ public int compare(Text arg0, Text arg1) {
+ return new Integer(arg0.charAt(0)).compareTo(new Integer(arg1.charAt(0)));
+ }
+ };
+
+ Comparator<Text> secondCharComparator = new Comparator<Text>(){
+ @Override
+ public int compare(Text arg0, Text arg1) {
+ return new Integer(arg0.charAt(1)).compareTo(new Integer(arg1.charAt(1)));
+ }
+ };
+
+ @Before
+ public void setUp() {
+ driver = ReduceDriver.newReduceDriver();
+ feeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
+ }
+
+ @Test
+ public void testEmptySortAndGroup() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+ assertEquals(0, outputs.size());
+ }
+
+ // just sort and group a single (k, v) pair
+ @Test
+ public void testSingleSortAndGroup() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+ expected.add(sublist);
+
+ assertListEquals(expected, outputs);
+ }
+
+ // sort and group multiple values from the same key.
+ @Test
+ public void testSortAndGroupOneKey() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist.add(new Pair<Text, Text>(new Text("a"), new Text("b")));
+ sublist.add(new Pair<Text, Text>(new Text("a"), new Text("c")));
+ expected.add(sublist);
+
+ assertListEquals(expected, outputs);
+ }
+
+ // sort and group multiple keys
+ @Test
+ public void testMultiSortAndGroup1() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+ expected.add(sublist1);
+
+ final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ expected.add(sublist2);
+
+ assertListEquals(expected, outputs);
+ }
+
+ // sort and group multiple keys that are out-of-order to start.
+ @Test
+ public void testMultiSortAndGroup2() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ inputs.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ inputs.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("x")));
+ sublist1.add(new Pair<Text, Text>(new Text("a"), new Text("y")));
+ expected.add(sublist1);
+
+ final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("v")));
+ sublist2.add(new Pair<Text, Text>(new Text("b"), new Text("w")));
+ expected.add(sublist2);
+
+ assertListEquals(expected, outputs);
+ }
+
+ @Test
+ public void testSortAndGroupWithOneComparator() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+ inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+ inputs.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+ inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v4")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs, secondCharComparator);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist1.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+ sublist1.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+ expected.add(sublist1);
+
+ final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist2.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+ sublist2.add(new Pair<Text, Text>(new Text("a2"), new Text("v4")));
+ expected.add(sublist2);
+
+ assertListEquals(expected, outputs);
+ }
+
+ @Test
+ public void testSortAndGroupWithTwoComparators() {
+ final List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+ inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v5")));
+ inputs.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+ inputs.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+ inputs.add(new Pair<Text, Text>(new Text("a3"), new Text("v4")));
+
+ final List<KeyValueReuseList<Text, Text>> outputs = feeder.sortAndGroup(inputs, secondCharComparator, firstCharComparator);
+
+ final List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ final KeyValueReuseList<Text, Text> sublist1 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist1.add(new Pair<Text, Text>(new Text("a1"), new Text("v1")));
+ expected.add(sublist1);
+
+ final KeyValueReuseList<Text, Text> sublist2 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist2.add(new Pair<Text, Text>(new Text("c1"), new Text("v3")));
+ expected.add(sublist2);
+
+ final KeyValueReuseList<Text, Text> sublist3 = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ sublist3.add(new Pair<Text, Text>(new Text("a2"), new Text("v5")));
+ sublist3.add(new Pair<Text, Text>(new Text("a2"), new Text("v2")));
+ sublist3.add(new Pair<Text, Text>(new Text("a3"), new Text("v4")));
+ expected.add(sublist3);
+
+ assertListEquals(expected, outputs);
+ }
+
+ @Test
+ public void testUpdateInput() {
+ List<Text> values = new ArrayList<Text>();
+ values.add(new Text("v1"));
+ values.add(new Text("v2"));
+ values.add(new Text("v3"));
+ Text key = new Text("k");
+ Pair<Text, List<Text>> oldInput = new Pair(key, values);
+ KeyValueReuseList<Text, Text> updated = feeder.updateInput(oldInput);
+
+ KeyValueReuseList<Text, Text> expected = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ for(Text value : values){
+ expected.add(new Pair(key, value));
+ }
+
+ assertListEquals(expected, updated);
+ }
+
+ @Test
+ public void testUpdateAll() {
+ List<Text> values = new ArrayList<Text>();
+ values.add(new Text("v1"));
+ values.add(new Text("v2"));
+ values.add(new Text("v3"));
+ Text key = new Text("k");
+ List<Pair<Text, List<Text>>> oldInputs = new ArrayList<Pair<Text, List<Text>>>();
+ oldInputs.add(new Pair(key, values));
+ List<KeyValueReuseList<Text, Text>> updated = feeder.updateAll(oldInputs);
+
+ KeyValueReuseList<Text, Text> kvList = new KeyValueReuseList<Text, Text>(new Text(), new Text(), driver.getConfiguration());
+ for(Text value : values){
+ kvList.add(new Pair(key, value));
+ }
+ List<KeyValueReuseList<Text, Text>> expected = new ArrayList<KeyValueReuseList<Text, Text>>();
+ expected.add(kvList);
+
+ assertListEquals(expected, updated);
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java
index 0809126..f1c8f21 100644
--- a/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java
+++ b/src/test/java/org/apache/hadoop/mrunit/mapreduce/TestReducerInputValueReuse.java
@@ -65,17 +65,17 @@
final TestReducer reducer = new TestReducer();
final ReduceDriver<Text, LongWritable, Text, LongWritable> driver = ReduceDriver
.newReduceDriver();
+ final ReduceFeeder<Text, LongWritable> feeder = new ReduceFeeder<Text, LongWritable>(driver.getConfiguration());
driver.setReducer(reducer);
final List<LongWritable> values = new ArrayList<LongWritable>();
values.add(new LongWritable(1));
values.add(new LongWritable(1));
values.add(new LongWritable(1));
values.add(new LongWritable(1));
- driver.withInput(new Text("foo"), values);
+ driver.withInput(feeder.updateInput(new Text("foo"), values));
driver.withOutput(new Text("foo"), new LongWritable(4));
driver.runTest();
assertTrue(reducer.instanceCheckOccurred);
assertFalse(reducer.instanceCheckFailed);
}
-
}
diff --git a/src/test/java/org/apache/hadoop/mrunit/types/TestKeyValueReuseList.java b/src/test/java/org/apache/hadoop/mrunit/types/TestKeyValueReuseList.java
new file mode 100644
index 0000000..c2566ce
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/mrunit/types/TestKeyValueReuseList.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.types;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceFeeder;
+import org.junit.Before;
+import org.junit.Test;
+import static org.apache.hadoop.mrunit.ExtendedAssert.*;
+
+public class TestKeyValueReuseList {
+ KeyValueReuseList<Text, Text> kvList;
+ ReduceDriver<Text, Text, Text, Text> driver;
+
+ @Before
+ public void setUp() {
+ driver = ReduceDriver.newReduceDriver();
+ ReduceFeeder<Text, Text> feeder = new ReduceFeeder<Text, Text>(driver.getConfiguration());
+
+ List<Pair<Text, Text>> inputs = new ArrayList<Pair<Text, Text>>();
+ inputs.add(new Pair<Text, Text>(new Text("k1"), new Text("v1")));
+ inputs.add(new Pair<Text, Text>(new Text("k2"), new Text("v2")));
+ inputs.add(new Pair<Text, Text>(new Text("k3"), new Text("v3")));
+ inputs.add(new Pair<Text, Text>(new Text("k4"), new Text("v4")));
+ kvList = feeder.sortAndGroup(inputs, new Comparator<Text>(){
+ @Override
+ public int compare(Text arg0, Text arg1) {
+ return 0;
+ }
+ }).get(0);
+ }
+
+ @Test
+ public void testValueIterator() {
+ Iterator<Text> iterator = kvList.valueIterator();
+
+ Text oldKey = null, oldValue = null;
+ for(int i = 1; i<5; i++){
+ Text currentValue = iterator.next();
+ Text currentKey = kvList.getCurrentKey();
+ Assert.assertEquals("v"+i, currentValue.toString());
+ Assert.assertEquals("k"+i, currentKey.toString());
+ if (oldKey != null && oldValue != null){
+ Assert.assertTrue( oldKey == currentKey );
+ Assert.assertTrue( oldValue == currentValue );
+ }
+ oldKey = currentKey;
+ oldValue = currentValue;
+ }
+ }
+
+ @Test
+ public void testClone() {
+ KeyValueReuseList<Text, Text> clone = kvList.clone(driver.getConfiguration());
+ Assert.assertTrue(kvList != clone);
+ assertListEquals(kvList, clone);
+
+ for (int i = 0 ; i < kvList.size() ; i++){
+ Assert.assertTrue(kvList.get(i) != clone.get(i));
+ }
+ }
+}