blob: 1ea060853d058eed0ce190e1782816b60a4e1181 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mrunit.mapreduce;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mrunit.internal.io.Serialization;
import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.apache.hadoop.mrunit.types.Pair;
/**
* This Class provides some methods to get inputs compatible with new API reducer.
*
* @param <K> The type of the keys.
* @param <V> The type of the values.
*/
public class ReduceFeeder<K, V> {
private final Serialization serialization;
private final Configuration conf;
/**
* Constructor
*
* @param conf The driver's configuration.
*/
public ReduceFeeder(Configuration conf){
this.conf = conf;
serialization = new Serialization(conf);
}
/**
* This method takes the outputs from a mapper and return the
* corresponding reducer input where keys have been sorted and
* grouped according to given comparators.
*
* If at least one comparator is null, Keys have to implements Comparable<K>.
*
* @param mapOutputs The outputs from mapper
* @param keyValueOrderComparator The comparator for ordering keys.
* @param keyGroupComparator The comparator for grouping keys
* @return key values sorted and grouped for the reducer
*/
public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs,
final Comparator<K> keyValueOrderComparator,
final Comparator<K> keyGroupComparator){
if(mapOutputs.isEmpty()) {
return Collections.emptyList();
}
if (keyValueOrderComparator != null){
Collections.sort(mapOutputs, new Comparator<Pair<K,V>>(){
@Override
public int compare(Pair<K, V> o1, Pair<K, V> o2) {
return keyValueOrderComparator.compare(o1.getFirst(), o2.getFirst());
}
});
} else {
Collections.sort(mapOutputs);
}
List<KeyValueReuseList<K, V>> groupedInputs = new ArrayList<KeyValueReuseList<K, V>>();
K currentKey = null;
KeyValueReuseList<K, V> currentEntries = null;
for(Pair<K,V> p : mapOutputs){
if (currentKey == null
|| (keyGroupComparator != null && keyGroupComparator.compare(currentKey, p.getFirst()) != 0)
|| (keyGroupComparator == null && ((Comparable<K>)currentKey).compareTo(p.getFirst()) != 0)) {
currentKey = p.getFirst();
currentEntries = new KeyValueReuseList<K, V>(serialization.copy(p.getFirst()), serialization.copy(p.getSecond()), conf);
groupedInputs.add(currentEntries);
}
currentEntries.add(p);
}
return groupedInputs;
}
/**
* This method takes the outputs from a mapper and return the
* corresponding reducer input where keys have been sorted and
* grouped according to given comparator.
*
* If the comparator is null, Keys have to implements Comparable<K>.
*
* @param mapOutputs The outputs from mapper
* @param keyOrderAndGroupComparator The comparator for grouping and ordering keys
* @return key values sorted and grouped for the reducer
*/
public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs,
final Comparator<K> keyOrderAndGroupComparator){
return sortAndGroup(mapOutputs, keyOrderAndGroupComparator, keyOrderAndGroupComparator);
}
/**
* This method takes the outputs from a mapper and return the
* corresponding reducer input where keys have been sorted and
* grouped according to their natural order.
*
* Keys have to implements Comparable<K>.
*
* @param mapOutputs The outputs from mapper
* @return key values sorted and grouped for the reducer
*/
public List<KeyValueReuseList<K,V>> sortAndGroup(final List<Pair<K, V>> mapOutputs){
return sortAndGroup(mapOutputs, null, null);
}
/**
* This method takes a list of (k, v*) and return a list of (k*, v*) where k have been duplicated.
*
* @param inputs The list of key values pair with one key and multiples values
* @return Reducer inputs compatible with the new API
*/
public List<KeyValueReuseList<K, V>> updateAll(final List<Pair<K, List<V>>> inputs) {
List<KeyValueReuseList<K, V>> transformedInputs = new ArrayList<KeyValueReuseList<K, V>>();
for(Pair<K, List<V>> keyValues : inputs){
if (!keyValues.getSecond().isEmpty()){
transformedInputs.add(updateInput(keyValues));
}
}
return transformedInputs;
}
/**
* This method takes a (k, v*) input and return a (k*, v*) where k have been duplicated.
*
* @param input The key values pair with one key for multiple values
* @return Reducer input compatible with the new API
*/
public KeyValueReuseList<K, V> updateInput(final Pair<K, List<V>> input){
return updateInput(input.getFirst(), input.getSecond());
}
/**
* This method takes a (k, v*) input and return a (k*, v*) where k have been duplicated.
*
* @param key the key for all the values
* @param values multiples values for the same key
* @return Reducer input compatible with the new API
*/
public KeyValueReuseList<K, V> updateInput(final K key, List<V> values){
if (values.isEmpty()){
return new KeyValueReuseList<K, V>(serialization.copy(key), null, conf);
}
KeyValueReuseList<K, V> entry =
new KeyValueReuseList<K, V>(serialization.copy(key),serialization.copy(values.get(0)), conf);
for(V value : values){
entry.add(new Pair<K, V>(serialization.copy(key), serialization.copy(value)));
}
return entry;
}
}