blob: a77ad08b3ff67af2e917c8c490b6b67b2f2b9350 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mrunit.mapreduce;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mrunit.internal.counters.CounterWrapper;
import org.apache.hadoop.mrunit.internal.driver.MultipleInputsMapReduceDriverBase;
import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.apache.hadoop.mrunit.types.Pair;
import java.io.IOException;
import java.util.*;
import static org.apache.hadoop.mrunit.internal.util.ArgumentChecker.returnNonNull;
/**
* Harness that allows you to test multiple Mappers and a Reducer instance
* together (along with an optional combiner). You provide the input keys and
* values that should be sent to each Mapper, and outputs you expect to be sent
* by the Reducer to the collector for those inputs. By calling runTest(), the
* harness will deliver the inputs to the respective Mappers, feed the
* intermediate results to the Reducer (without checking them), and will check
* the Reducer's outputs against the expected results.
*
* If a combiner is specified, it will run exactly once after all the Mappers
* and before the Reducer
*
* @param <K1>
* The common map output key type
* @param <V1>
* The common map output value type
* @param <K2>
* The reduce output key type
* @param <V2>
* The reduce output value type
*/
public class MultipleInputsMapReduceDriver<K1, V1, K2, V2>
extends
MultipleInputsMapReduceDriverBase<Mapper, K1, V1, K2, V2, MultipleInputsMapReduceDriver<K1, V1, K2, V2>> {
public static final Log LOG = LogFactory
.getLog(MultipleInputsMapReduceDriver.class);
private Set<Mapper> mappers = new HashSet<Mapper>();
/**
* Add a mapper to use with this test driver
*
* @param mapper
* The mapper instance to add
* @param <K>
* The input key type to the mapper
* @param <V>
* The input value type to the mapper
*/
public <K, V> void addMapper(final Mapper<K, V, K1, V1> mapper) {
this.mappers.add(returnNonNull(mapper));
}
/**
* Identical to addMapper but supports a fluent programming style
*
* @param mapper
* The mapper instance to add
* @param <K>
* The input key type to the mapper
* @param <V>
* The input value type to the mapper
* @return this
*/
public <K, V> MultipleInputsMapReduceDriver<K1, V1, K2, V2> withMapper(
final Mapper<K, V, K1, V1> mapper) {
addMapper(mapper);
return this;
}
/**
* @return The Mapper instances being used by this test
*/
public Collection<Mapper> getMappers() {
return mappers;
}
private Reducer<K1, V1, K1, V1> combiner;
/**
* Set the combiner to use with this test driver
*
* @param combiner
* The combiner instance to use
*/
public void setCombiner(final Reducer<K1, V1, K1, V1> combiner) {
this.combiner = returnNonNull(combiner);
}
/**
* Identical to setCombiner but supports a fluent programming style
*
* @param combiner
* The combiner instance to use
* @return this
*/
public MultipleInputsMapReduceDriver<K1, V1, K2, V2> withCombiner(
final Reducer<K1, V1, K1, V1> combiner) {
setCombiner(combiner);
return this;
}
/**
* @return The combiner instance being used by this test
*/
public Reducer<K1, V1, K1, V1> getCombiner() {
return combiner;
}
private Reducer<K1, V1, K2, V2> reducer;
/**
* Set the reducer to use with this test driver
*
* @param reducer
* The reducer instance to use
*/
public void setReducer(final Reducer<K1, V1, K2, V2> reducer) {
this.reducer = returnNonNull(reducer);
}
/**
* Identical to setReducer but supports a fluent programming style
*
* @param reducer
* The reducer instance to use
* @return this
*/
public MultipleInputsMapReduceDriver<K1, V1, K2, V2> withReducer(
final Reducer<K1, V1, K2, V2> reducer) {
setReducer(reducer);
return this;
}
/**
* @return Get the reducer instance being used by this test
*/
public Reducer<K1, V1, K2, V2> getReducer() {
return reducer;
}
private Counters counters;
/**
* @return The counters used in this test
*/
public Counters getCounters() {
return counters;
}
/**
* Sets the counters object to use for this test
*
* @param counters
* The counters object to use
*/
public void setCounters(Counters counters) {
this.counters = counters;
counterWrapper = new CounterWrapper(counters);
}
/**
* Identical to setCounters but supports a fluent programming style
*
* @param counters
* The counters object to use
* @return this
*/
public MultipleInputsMapReduceDriver<K1, V1, K2, V2> withCounter(
Counters counters) {
setCounters(counters);
return this;
}
private Class<? extends OutputFormat> outputFormatClass;
/**
* Configure {@link Reducer} to output with a real {@link OutputFormat}.
*
* @param outputFormatClass
* The OutputFormat class
* @return this
*/
public MultipleInputsMapReduceDriver<K1, V1, K2, V2> withOutputFormat(
final Class<? extends OutputFormat> outputFormatClass) {
this.outputFormatClass = returnNonNull(outputFormatClass);
return this;
}
private Class<? extends InputFormat> inputFormatClass;
/**
* Set the InputFormat
*
* @param inputFormatClass
* The InputFormat class
* @return this
*/
public MultipleInputsMapReduceDriver<K1, V1, K2, V2> withInputFormat(
final Class<? extends InputFormat> inputFormatClass) {
this.inputFormatClass = returnNonNull(inputFormatClass);
return this;
}
/**
* Construct a driver with the specified Reducer. Note that a Combiner can be
* set separately.
*
* @param reducer
* The reducer to use
*/
public MultipleInputsMapReduceDriver(Reducer<K1, V1, K2, V2> reducer) {
this();
this.reducer = reducer;
}
/**
* Construct a driver with the specified Combiner and Reducers
*
* @param combiner
* The combiner to use
* @param reducer
* The reducer to use
*/
public MultipleInputsMapReduceDriver(Reducer<K1, V1, K1, V1> combiner,
Reducer<K1, V1, K2, V2> reducer) {
this(reducer);
this.combiner = combiner;
}
/**
* Construct a driver without specifying a Combiner nor a Reducer. Note that
* these can be set with the appropriate set methods and that at least the
* Reducer must be set.
*/
public MultipleInputsMapReduceDriver() {
setCounters(new Counters());
}
/**
* Static factory-style method to construct a driver instance with the
* specified Combiner and Reducer
*
* @param combiner
* The combiner to use
* @param reducer
* The reducer to use
* @param <K1>
* The common output key type of the mappers
* @param <V1>
* The common output value type of the mappers
* @param <K2>
* The output key type of the reducer
* @param <V2>
* The output value type of the reducer
* @return this to support fluent programming style
*/
public static <K1, V1, K2, V2> MultipleInputsMapReduceDriver<K1, V1, K2, V2> newMultipleInputMapReduceDriver(
final Reducer<K1, V1, K1, V1> combiner,
final Reducer<K1, V1, K2, V2> reducer) {
return new MultipleInputsMapReduceDriver<K1, V1, K2, V2>(combiner, reducer);
}
/**
* Static factory-style method to construct a driver instance with the
* specified Reducer
*
* @param reducer
* The reducer to use
* @param <K1>
* The common output key type of the mappers
* @param <V1>
* The common output value type of the mappers
* @param <K2>
* The output key type of the reducer
* @param <V2>
* The output value type of the reducer
* @return this to support fluent programming style
*/
public static <K1, V1, K2, V2> MultipleInputsMapReduceDriver<K1, V1, K2, V2> newMultipleInputMapReduceDriver(
final Reducer<K1, V1, K2, V2> reducer) {
return new MultipleInputsMapReduceDriver<K1, V1, K2, V2>(reducer);
}
/**
* Static factory-style method to construct a driver instance without
* specifying a Combiner nor a Reducer. Note that these can be set separately
* by using the appropriate set (or with) methods and that at least a Reducer
* must be set
*
* @param <K1>
* The common output key type of the mappers
* @param <V1>
* The common output value type of the mappers
* @param <K2>
* The output key type of the reducer
* @param <V2>
* The output value type of the reducer
* @return this to support fluent programming style
*/
public static <K1, V1, K2, V2> MultipleInputsMapReduceDriver<K1, V1, K2, V2> newMultipleInputMapReduceDriver() {
return new MultipleInputsMapReduceDriver<K1, V1, K2, V2>();
}
/**
* Add the specified (key, val) pair to the specified mapper
*
* @param mapper
* The mapper to add the input pair to
* @param key
* The key
* @param val
* The value
* @param <K>
* The type of the key
* @param <V>
* The type of the value
*/
public <K, V> void addInput(final Mapper<K, V, K1, V1> mapper, final K key,
final V val) {
super.addInput(mapper, key, val);
}
/**
* Add the specified input pair to the specified mapper
*
* @param mapper
* The mapper to add the input pair to
* @param input
* The (k,v) pair to add
* @param <K>
* The type of the key
* @param <V>
* The type of the value
*/
public <K, V> void addInput(final Mapper<K, V, K1, V1> mapper,
final Pair<K, V> input) {
super.addInput(mapper, input);
}
/**
* Add the specified input pairs to the specified mapper
*
* @param mapper
* The mapper to add the input pairs to
* @param inputs
* The (k, v) pairs to add
* @param <K>
* The type of the key
* @param <V>
* The type of the value
*/
public <K, V> void addAll(final Mapper<K, V, K1, V1> mapper,
final List<Pair<K, V>> inputs) {
super.addAll(mapper, inputs);
}
/**
* Identical to addInput but supports fluent programming style
*
* @param mapper
* The mapper to add the input pair to
* @param key
* The key
* @param val
* The value
* @param <K>
* The type of the key
* @param <V>
* The type of the value
* @return this
*/
public <K, V> MultipleInputsMapReduceDriver<K1, V1, K2, V2> withInput(
final Mapper<K, V, K1, V1> mapper, final K key, final V val) {
return super.withInput(mapper, key, val);
}
/**
* Identical to addInput but supports fluent programming style
*
* @param mapper
* The mapper to add the input pairs to
* @param inputs
* The (k, v) pairs to add
* @param <K>
* The type of the key
* @param <V>
* The type of the value
* @return this
*/
public <K, V> MultipleInputsMapReduceDriver<K1, V1, K2, V2> withInput(
final Mapper<K, V, K1, V1> mapper, final Pair<K, V> input) {
return super.withInput(mapper, input);
}
/**
* Identical to addInput but supports fluent programming style
*
* @param mapper
* The mapper to add the input pairs to
* @param inputs
* The (k, v) pairs to add
* @param <K>
* The type of the key
* @param <V>
* The type of the value
* @return this
*/
public <K, V> MultipleInputsMapReduceDriver<K1, V1, K2, V2> withAll(
final Mapper<K, V, K1, V1> mapper, final List<Pair<K, V>> inputs) {
return super.withAll(mapper, inputs);
}
@Override
protected void preRunChecks(Set<Mapper> mappers, Object reducer) {
if (mappers.isEmpty()) {
throw new IllegalStateException("No mappers were provided");
}
super.preRunChecks(mappers, reducer);
}
protected List<KeyValueReuseList<K1, V1>> sortAndGroup(
final List<Pair<K1, V1>> mapOutputs) {
if (mapOutputs.isEmpty()) {
return Collections.emptyList();
}
if (keyValueOrderComparator == null || keyGroupComparator == null) {
JobConf conf = new JobConf(getConfiguration());
conf.setMapOutputKeyClass(mapOutputs.get(0).getFirst().getClass());
if (keyGroupComparator == null) {
keyGroupComparator = conf.getOutputValueGroupingComparator();
}
if (keyValueOrderComparator == null) {
keyValueOrderComparator = conf.getOutputKeyComparator();
}
}
ReduceFeeder<K1, V1> reduceFeeder = new ReduceFeeder<K1, V1>(
getConfiguration());
return reduceFeeder.sortAndGroup(mapOutputs, keyValueOrderComparator,
keyGroupComparator);
}
@SuppressWarnings("unchecked")
@Override
public List<Pair<K2, V2>> run() throws IOException {
try {
preRunChecks(mappers, reducer);
initDistributedCache();
List<Pair<K1, V1>> outputs = new ArrayList<Pair<K1, V1>>();
for (Mapper mapper : mappers) {
MapDriver mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setCounters(counters);
mapDriver.setConfiguration(getConfiguration());
mapDriver.addAll(inputs.get(mapper));
mapDriver.withMapInputPath(getMapInputPath(mapper));
outputs.addAll(mapDriver.run());
}
if (combiner != null) {
LOG.debug("Starting combine phase with combiner: " + combiner);
outputs = new ReducePhaseRunner<K1, V1, K1, V1>(inputFormatClass,
getConfiguration(), counters,
getOutputSerializationConfiguration(), outputFormatClass)
.runReduce(sortAndGroup(outputs), combiner);
}
LOG.debug("Starting reduce phase with reducer: " + reducer);
return new ReducePhaseRunner<K1, V1, K2, V2>(inputFormatClass,
getConfiguration(), counters, getOutputSerializationConfiguration(),
outputFormatClass).runReduce(sortAndGroup(outputs), reducer);
} finally {
cleanupDistributedCache();
}
}
}