blob: 3527940a904423c6f056b776b8d4ff91ff871890 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mrunit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mrunit.types.Pair;
/**
* Harness that allows you to test a dataflow through a set of Mappers and
* Reducers. You provide a set of (Mapper, Reducer) "jobs" that make up
* a workflow, as well as a set of (key, value) pairs to pass in to the first
* Mapper. You can also specify the outputs you expect to be sent to the final
* Reducer in the pipeline.
*
* By calling runTest(), the harness will deliver the input to the first
* Mapper, feed the intermediate results to the first Reducer (without checking
* them), and proceed to forward this data along to subsequent Mapper/Reducer
* jobs in the pipeline until the final Reducer. The last Reducer's outputs are
* checked against the expected results.
*
* This is designed for slightly more complicated integration tests than the
* MapReduceDriver, which is for smaller unit tests.
*
* (K1, V1) in the type signature refer to the types associated with the inputs
* to the first Mapper. (K2, V2) refer to the types associated with the final
* Reducer's output. No intermediate types are specified.
*/
public class PipelineMapReduceDriver<K1, V1, K2, V2>
extends TestDriver<K1, V1, K2, V2> {
public static final Log LOG = LogFactory.getLog(PipelineMapReduceDriver.class);
private List<Pair<Mapper, Reducer>> mapReducePipeline;
private List<Pair<K1, V1>> inputList;
private Counters counters;
public PipelineMapReduceDriver(final List<Pair<Mapper, Reducer>> pipeline) {
this.mapReducePipeline = copyMapReduceList(pipeline);
this.inputList = new ArrayList<Pair<K1, V1>>();
this.counters = new Counters();
}
public PipelineMapReduceDriver() {
this.mapReducePipeline = new ArrayList<Pair<Mapper, Reducer>>();
this.inputList = new ArrayList<Pair<K1, V1>>();
this.counters = new Counters();
}
private List<Pair<Mapper, Reducer>> copyMapReduceList(List<Pair<Mapper, Reducer>> lst) {
List<Pair<Mapper, Reducer>> outList = new ArrayList<Pair<Mapper, Reducer>>();
for (Pair<Mapper, Reducer> p : lst) {
// Take advantage of the fact that Pair is immutable.
outList.add(p);
}
return outList;
}
/** @return the counters used in this test */
public Counters getCounters() {
return counters;
}
/** Sets the counters object to use for this test.
* @param ctrs The counters object to use.
*/
public void setCounters(final Counters ctrs) {
this.counters = ctrs;
}
/** Sets the counters to use and returns self for fluent style */
public PipelineMapReduceDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
setCounters(ctrs);
return this;
}
/** Add a Mapper and Reducer instance to the pipeline to use with this test driver
* @param m The Mapper instance to add to the pipeline
* @param r The Reducer instance to add to the pipeline
*/
public void addMapReduce(Mapper m, Reducer r) {
Pair<Mapper, Reducer> p = new Pair<Mapper, Reducer>(m, r);
this.mapReducePipeline.add(p);
}
/** Add a Mapper and Reducer instance to the pipeline to use with this test driver
* @param p The Mapper and Reducer instances to add to the pipeline
*/
public void addMapReduce(Pair<Mapper, Reducer> p) {
this.mapReducePipeline.add(p);
}
/** Add a Mapper and Reducer instance to the pipeline to use with this test driver
* using fluent style
* @param m The Mapper instance to use
* @param r The Reducer instance to use
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withMapReduce(Mapper m, Reducer r) {
addMapReduce(m, r);
return this;
}
/** Add a Mapper and Reducer instance to the pipeline to use with this test driver
* using fluent style
* @param p The Mapper and Reducer instances to add to the pipeline
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withMapReduce(Pair<Mapper, Reducer> p) {
addMapReduce(p);
return this;
}
/**
* @return A copy of the list of Mapper and Reducer objects under test
*/
public List<Pair<Mapper, Reducer>> getMapReducePipeline() {
return copyMapReduceList(this.mapReducePipeline);
}
/**
* Adds an input to send to the mapper
* @param key
* @param val
*/
public void addInput(K1 key, V1 val) {
inputList.add(new Pair<K1, V1>(key, val));
}
/**
* Identical to addInput() but returns self for fluent programming style
* @param key
* @param val
* @return this
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withInput(K1 key, V1 val) {
addInput(key, val);
return this;
}
/**
* Adds an input to send to the Mapper
* @param input The (k, v) pair to add to the input list.
*/
public void addInput(Pair<K1, V1> input) {
if (null == input) {
throw new IllegalArgumentException("Null input in addInput()");
}
inputList.add(input);
}
/**
* Identical to addInput() but returns self for fluent programming style
* @param input The (k, v) pair to add
* @return this
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withInput(
Pair<K1, V1> input) {
addInput(input);
return this;
}
/**
* Adds an output (k, v) pair we expect from the Reducer
* @param outputRecord The (k, v) pair to add
*/
public void addOutput(Pair<K2, V2> outputRecord) {
if (null != outputRecord) {
expectedOutputs.add(outputRecord);
} else {
throw new IllegalArgumentException("Tried to add null outputRecord");
}
}
/**
* Works like addOutput(), but returns self for fluent style
* @param outputRecord
* @return this
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withOutput(
Pair<K2, V2> outputRecord) {
addOutput(outputRecord);
return this;
}
/**
* Adds a (k, v) pair we expect as output from the Reducer
* @param key
* @param val
*/
public void addOutput(K2 key, V2 val) {
addOutput(new Pair<K2, V2>(key, val));
}
/**
* Functions like addOutput() but returns self for fluent programming style
* @param key
* @param val
* @return this
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withOutput(K2 key, V2 val) {
addOutput(key, val);
return this;
}
/**
* Expects an input of the form "key \t val"
* Forces the Mapper input types to Text.
* @param input A string of the form "key \t val". Trims any whitespace.
*/
public void addInputFromString(String input) {
if (null == input) {
throw new IllegalArgumentException("null input given to setInput");
} else {
Pair<Text, Text> inputPair = parseTabbedPair(input);
if (null != inputPair) {
// I know this is not type-safe, but I don't
// know a better way to do this.
addInput((Pair<K1, V1>) inputPair);
} else {
throw new IllegalArgumentException("Could not parse input pair in addInput");
}
}
}
/**
* Identical to addInputFromString, but with a fluent programming style
* @param input A string of the form "key \t val". Trims any whitespace.
* @return this
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withInputFromString(String input) {
addInputFromString(input);
return this;
}
/**
* Expects an input of the form "key \t val"
* Forces the Reducer output types to Text.
* @param output A string of the form "key \t val". Trims any whitespace.
*/
public void addOutputFromString(String output) {
if (null == output) {
throw new IllegalArgumentException("null input given to setOutput");
} else {
Pair<Text, Text> outputPair = parseTabbedPair(output);
if (null != outputPair) {
// I know this is not type-safe,
// but I don't know a better way to do this.
addOutput((Pair<K2, V2>) outputPair);
} else {
throw new IllegalArgumentException(
"Could not parse output pair in setOutput");
}
}
}
/**
* Identical to addOutputFromString, but with a fluent programming style
* @param output A string of the form "key \t val". Trims any whitespace.
* @return this
*/
public PipelineMapReduceDriver<K1, V1, K2, V2> withOutputFromString(String output) {
addOutputFromString(output);
return this;
}
public List<Pair<K2, V2>> run() throws IOException {
// inputs starts with the user-provided inputs.
List inputs = this.inputList;
if (mapReducePipeline.size() == 0) {
LOG.warn("No Mapper or Reducer instances in pipeline; this is a trivial test.");
}
if (inputs.size() == 0) {
LOG.warn("No inputs configured to send to MapReduce pipeline; this is a trivial test.");
}
for (Pair<Mapper, Reducer> job : mapReducePipeline) {
// Create a MapReduceDriver to run this phase of the pipeline.
MapReduceDriver mrDriver = new MapReduceDriver(job.getFirst(), job.getSecond());
mrDriver.setCounters(getCounters());
// Add the inputs from the user, or from the previous stage of the pipeline.
for (Object input : inputs) {
mrDriver.addInput((Pair) input);
}
// Run the MapReduce "job". The output of this job becomes
// the input to the next job.
inputs = mrDriver.run();
}
// The last list of values stored in "inputs" is actually the outputs.
// Unfortunately, due to the variable-length list of MR passes the user
// can test, this is not type-safe.
return (List<Pair<K2, V2>>) inputs;
}
@Override
public void runTest() throws RuntimeException {
List<Pair<K2, V2>> outputs = null;
boolean succeeded;
try {
outputs = run();
validate(outputs);
} catch (IOException ioe) {
LOG.error("IOException: " + ioe.toString());
LOG.debug("Setting success to false based on IOException");
throw new RuntimeException();
}
}
}