blob: 3441b7c54b57ce24a3ab46c882f62e85fe181532 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.chain;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.Stringifier;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.util.ReflectionUtils;
/**
* The Chain class provides all the common functionality for the
* {@link ChainMapper} and the {@link ChainReducer} classes.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Chain {
protected static final String CHAIN_MAPPER = "mapreduce.chain.mapper";
protected static final String CHAIN_REDUCER = "mapreduce.chain.reducer";
protected static final String CHAIN_MAPPER_SIZE = ".size";
protected static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
protected static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
protected static final String CHAIN_REDUCER_CLASS = ".reducer.class";
protected static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
protected static final String MAPPER_INPUT_KEY_CLASS =
"mapreduce.chain.mapper.input.key.class";
protected static final String MAPPER_INPUT_VALUE_CLASS =
"mapreduce.chain.mapper.input.value.class";
protected static final String MAPPER_OUTPUT_KEY_CLASS =
"mapreduce.chain.mapper.output.key.class";
protected static final String MAPPER_OUTPUT_VALUE_CLASS =
"mapreduce.chain.mapper.output.value.class";
protected static final String REDUCER_INPUT_KEY_CLASS =
"mapreduce.chain.reducer.input.key.class";
protected static final String REDUCER_INPUT_VALUE_CLASS =
"maperduce.chain.reducer.input.value.class";
protected static final String REDUCER_OUTPUT_KEY_CLASS =
"mapreduce.chain.reducer.output.key.class";
protected static final String REDUCER_OUTPUT_VALUE_CLASS =
"mapreduce.chain.reducer.output.value.class";
protected boolean isMap;
@SuppressWarnings("unchecked")
private List<Mapper> mappers = new ArrayList<Mapper>();
private Reducer<?, ?, ?, ?> reducer;
private List<Configuration> confList = new ArrayList<Configuration>();
private Configuration rConf;
private List<Thread> threads = new ArrayList<Thread>();
private List<ChainBlockingQueue<?>> blockingQueues =
new ArrayList<ChainBlockingQueue<?>>();
private Throwable throwable = null;
/**
* Creates a Chain instance configured for a Mapper or a Reducer.
*
* @param isMap
* TRUE indicates the chain is for a Mapper, FALSE that is for a
* Reducer.
*/
protected Chain(boolean isMap) {
this.isMap = isMap;
}
static class KeyValuePair<K, V> {
K key;
V value;
boolean endOfInput;
KeyValuePair(K key, V value) {
this.key = key;
this.value = value;
this.endOfInput = false;
}
KeyValuePair(boolean eof) {
this.key = null;
this.value = null;
this.endOfInput = eof;
}
}
// ChainRecordReader either reads from blocking queue or task context.
private static class ChainRecordReader<KEYIN, VALUEIN> extends
RecordReader<KEYIN, VALUEIN> {
private Class<?> keyClass;
private Class<?> valueClass;
private KEYIN key;
private VALUEIN value;
private Configuration conf;
TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> inputContext = null;
ChainBlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue = null;
// constructor to read from a blocking queue
ChainRecordReader(Class<?> keyClass, Class<?> valueClass,
ChainBlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue,
Configuration conf) {
this.keyClass = keyClass;
this.valueClass = valueClass;
this.inputQueue = inputQueue;
this.conf = conf;
}
// constructor to read from the context
ChainRecordReader(TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> context) {
inputContext = context;
}
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
/**
* Advance to the next key, value pair, returning null if at end.
*
* @return the key object that was read into, or null if no more
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
if (inputQueue != null) {
return readFromQueue();
} else if (inputContext.nextKeyValue()) {
this.key = inputContext.getCurrentKey();
this.value = inputContext.getCurrentValue();
return true;
} else {
return false;
}
}
@SuppressWarnings("unchecked")
private boolean readFromQueue() throws IOException, InterruptedException {
KeyValuePair<KEYIN, VALUEIN> kv = null;
// wait for input on queue
kv = inputQueue.dequeue();
if (kv.endOfInput) {
return false;
}
key = (KEYIN) ReflectionUtils.newInstance(keyClass, conf);
value = (VALUEIN) ReflectionUtils.newInstance(valueClass, conf);
ReflectionUtils.copy(conf, kv.key, this.key);
ReflectionUtils.copy(conf, kv.value, this.value);
return true;
}
/**
* Get the current key.
*
* @return the current key object or null if there isn't one
* @throws IOException
* @throws InterruptedException
*/
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return this.key;
}
/**
* Get the current value.
*
* @return the value object that was read into
* @throws IOException
* @throws InterruptedException
*/
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return this.value;
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
}
// ChainRecordWriter either writes to blocking queue or task context
private static class ChainRecordWriter<KEYOUT, VALUEOUT> extends
RecordWriter<KEYOUT, VALUEOUT> {
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> outputContext = null;
ChainBlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> outputQueue = null;
KEYOUT keyout;
VALUEOUT valueout;
Configuration conf;
Class<?> keyClass;
Class<?> valueClass;
// constructor to write to context
ChainRecordWriter(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
outputContext = context;
}
// constructor to write to blocking queue
ChainRecordWriter(Class<?> keyClass, Class<?> valueClass,
ChainBlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> output,
Configuration conf) {
this.keyClass = keyClass;
this.valueClass = valueClass;
this.outputQueue = output;
this.conf = conf;
}
/**
* Writes a key/value pair.
*
* @param key
* the key to write.
* @param value
* the value to write.
* @throws IOException
*/
public void write(KEYOUT key, VALUEOUT value) throws IOException,
InterruptedException {
if (outputQueue != null) {
writeToQueue(key, value);
} else {
outputContext.write(key, value);
}
}
@SuppressWarnings("unchecked")
private void writeToQueue(KEYOUT key, VALUEOUT value) throws IOException,
InterruptedException {
this.keyout = (KEYOUT) ReflectionUtils.newInstance(keyClass, conf);
this.valueout = (VALUEOUT) ReflectionUtils.newInstance(valueClass, conf);
ReflectionUtils.copy(conf, key, this.keyout);
ReflectionUtils.copy(conf, value, this.valueout);
// wait to write output to queuue
outputQueue.enqueue(new KeyValuePair<KEYOUT, VALUEOUT>(keyout, valueout));
}
/**
* Close this <code>RecordWriter</code> to future operations.
*
* @param context
* the context of the task
* @throws IOException
*/
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
if (outputQueue != null) {
// write end of input
outputQueue.enqueue(new KeyValuePair<KEYOUT, VALUEOUT>(true));
}
}
}
private synchronized Throwable getThrowable() {
return throwable;
}
private synchronized boolean setIfUnsetThrowable(Throwable th) {
if (throwable == null) {
throwable = th;
return true;
}
return false;
}
private class MapRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Thread {
private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context chainContext;
private RecordReader<KEYIN, VALUEIN> rr;
private RecordWriter<KEYOUT, VALUEOUT> rw;
public MapRunner(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper,
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext,
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw)
throws IOException, InterruptedException {
this.mapper = mapper;
this.rr = rr;
this.rw = rw;
this.chainContext = mapperContext;
}
@Override
public void run() {
if (getThrowable() != null) {
return;
}
try {
mapper.run(chainContext);
rr.close();
rw.close(chainContext);
} catch (Throwable th) {
if (setIfUnsetThrowable(th)) {
interruptAllThreads();
}
}
}
}
private class ReduceRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Thread {
private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context chainContext;
private RecordWriter<KEYOUT, VALUEOUT> rw;
ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
InterruptedException {
this.reducer = reducer;
this.chainContext = context;
this.rw = rw;
}
@Override
public void run() {
try {
reducer.run(chainContext);
rw.close(chainContext);
} catch (Throwable th) {
if (setIfUnsetThrowable(th)) {
interruptAllThreads();
}
}
}
}
Configuration getConf(int index) {
return confList.get(index);
}
/**
* Create a map context that is based on ChainMapContext and the given record
* reader and record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext =
new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rr, rw, conf);
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext =
new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getMapContext(mapContext);
return mapperContext;
}
@SuppressWarnings("unchecked")
void runMapper(TaskInputOutputContext context, int index) throws IOException,
InterruptedException {
Mapper mapper = mappers.get(index);
RecordReader rr = new ChainRecordReader(context);
RecordWriter rw = new ChainRecordWriter(context);
Mapper.Context mapperContext = createMapContext(rr, rw, context,
getConf(index));
mapper.run(mapperContext);
rr.close();
rw.close(context);
}
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
/**
* Add mapper(the last mapper) that reads input from
* queue and writes output to the output context
*/
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
TaskInputOutputContext outputContext, int index) throws IOException,
InterruptedException {
Configuration conf = getConf(index);
Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
RecordWriter rw = new ChainRecordWriter(outputContext);
MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
rw, outputContext, getConf(index)), rr, rw);
threads.add(runner);
}
/**
* Add mapper that reads and writes from/to the queue
*/
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
ChainBlockingQueue<KeyValuePair<?, ?>> output,
TaskInputOutputContext context, int index) throws IOException,
InterruptedException {
Configuration conf = getConf(index);
Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
rw, context, getConf(index)), rr, rw);
threads.add(runner);
}
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
// Run the reducer directly.
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
// start all the threads
void startAllThreads() {
for (Thread thread : threads) {
thread.start();
}
}
// wait till all threads finish
void joinAllThreads() throws IOException, InterruptedException {
for (Thread thread : threads) {
thread.join();
}
Throwable th = getThrowable();
if (th != null) {
if (th instanceof IOException) {
throw (IOException) th;
} else if (th instanceof InterruptedException) {
throw (InterruptedException) th;
} else {
throw new RuntimeException(th);
}
}
}
// interrupt all threads
private synchronized void interruptAllThreads() {
for (Thread th : threads) {
th.interrupt();
}
for (ChainBlockingQueue<?> queue : blockingQueues) {
queue.interrupt();
}
}
/**
* Returns the prefix to use for the configuration of the chain depending if
* it is for a Mapper or a Reducer.
*
* @param isMap
* TRUE for Mapper, FALSE for Reducer.
* @return the prefix to use.
*/
protected static String getPrefix(boolean isMap) {
return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
}
protected static int getIndex(Configuration conf, String prefix) {
return conf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
}
/**
* Creates a {@link Configuration} for the Map or Reduce in the chain.
*
* <p>
* It creates a new Configuration using the chain job's Configuration as base
* and adds to it the configuration properties for the chain element. The keys
* of the chain element Configuration have precedence over the given
* Configuration.
* </p>
*
* @param jobConf
* the chain job's Configuration.
* @param confKey
* the key for chain element configuration serialized in the chain
* job's Configuration.
* @return a new Configuration aggregating the chain job's Configuration with
* the chain element configuration properties.
*/
protected static Configuration getChainElementConf(Configuration jobConf,
String confKey) {
Configuration conf = null;
try {
Stringifier<Configuration> stringifier =
new DefaultStringifier<Configuration>(jobConf, Configuration.class);
String confString = jobConf.get(confKey, null);
if (confString != null) {
conf = stringifier.fromString(jobConf.get(confKey, null));
}
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
// we have to do this because the Writable desearialization clears all
// values set in the conf making not possible do a
// new Configuration(jobConf) in the creation of the conf above
jobConf = new Configuration(jobConf);
if (conf != null) {
for (Map.Entry<String, String> entry : conf) {
jobConf.set(entry.getKey(), entry.getValue());
}
}
return jobConf;
}
/**
* Adds a Mapper class to the chain job.
*
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Mapper.
*
* @param isMap
* indicates if the Chain is for a Mapper or for a Reducer.
* @param job
* chain job.
* @param klass
* the Mapper class to add.
* @param inputKeyClass
* mapper input key class.
* @param inputValueClass
* mapper input value class.
* @param outputKeyClass
* mapper output key class.
* @param outputValueClass
* mapper output value class.
* @param mapperConf
* a configuration for the Mapper class. It is recommended to use a
* Configuration without default values using the
* <code>Configuration(boolean loadDefaults)</code> constructor with
* FALSE.
*/
@SuppressWarnings("unchecked")
protected static void addMapper(boolean isMap, Job job,
Class<? extends Mapper> klass, Class<?> inputKeyClass,
Class<?> inputValueClass, Class<?> outputKeyClass,
Class<?> outputValueClass, Configuration mapperConf) {
String prefix = getPrefix(isMap);
Configuration jobConf = job.getConfiguration();
// if a reducer chain check the Reducer has been already set
checkReducerAlreadySet(isMap, jobConf, prefix, true);
// set the mapper class
int index = getIndex(jobConf, prefix);
jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
outputKeyClass, outputValueClass, index, prefix);
setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
outputKeyClass, outputValueClass, mapperConf, index, prefix);
}
// if a reducer chain check the Reducer has been already set or not
protected static void checkReducerAlreadySet(boolean isMap,
Configuration jobConf, String prefix, boolean shouldSet) {
if (!isMap) {
if (shouldSet) {
if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) == null) {
throw new IllegalStateException(
"A Mapper can be added to the chain only after the Reducer has "
+ "been set");
}
} else {
if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
throw new IllegalStateException("Reducer has been already set");
}
}
}
}
protected static void validateKeyValueTypes(boolean isMap,
Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass, int index,
String prefix) {
// if it is a reducer chain and the first Mapper is being added check the
// key and value input classes of the mapper match those of the reducer
// output.
if (!isMap && index == 0) {
Configuration reducerConf = getChainElementConf(jobConf, prefix
+ CHAIN_REDUCER_CONFIG);
if (!inputKeyClass.isAssignableFrom(reducerConf.getClass(
REDUCER_OUTPUT_KEY_CLASS, null))) {
throw new IllegalArgumentException("The Reducer output key class does"
+ " not match the Mapper input key class");
}
if (!inputValueClass.isAssignableFrom(reducerConf.getClass(
REDUCER_OUTPUT_VALUE_CLASS, null))) {
throw new IllegalArgumentException("The Reducer output value class"
+ " does not match the Mapper input value class");
}
} else if (index > 0) {
// check the that the new Mapper in the chain key and value input classes
// match those of the previous Mapper output.
Configuration previousMapperConf = getChainElementConf(jobConf, prefix
+ CHAIN_MAPPER_CONFIG + (index - 1));
if (!inputKeyClass.isAssignableFrom(previousMapperConf.getClass(
MAPPER_OUTPUT_KEY_CLASS, null))) {
throw new IllegalArgumentException("The Mapper output key class does"
+ " not match the previous Mapper input key class");
}
if (!inputValueClass.isAssignableFrom(previousMapperConf.getClass(
MAPPER_OUTPUT_VALUE_CLASS, null))) {
throw new IllegalArgumentException("The Mapper output value class"
+ " does not match the previous Mapper input value class");
}
}
}
protected static void setMapperConf(boolean isMap, Configuration jobConf,
Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass,
Configuration mapperConf, int index, String prefix) {
// if the Mapper does not have a configuration, create an empty one
if (mapperConf == null) {
// using a Configuration without defaults to make it lightweight.
// still the chain's conf may have all defaults and this conf is
// overlapped to the chain configuration one.
mapperConf = new Configuration(true);
}
// store the input/output classes of the mapper in the mapper conf
mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
mapperConf
.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, Object.class);
mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
Object.class);
// serialize the mapper configuration in the chain configuration.
Stringifier<Configuration> stringifier =
new DefaultStringifier<Configuration>(jobConf, Configuration.class);
try {
jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, stringifier
.toString(new Configuration(mapperConf)));
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
// increment the chain counter
jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
}
/**
* Sets the Reducer class to the chain job.
*
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param job
* the chain job.
* @param klass
* the Reducer class to add.
* @param inputKeyClass
* reducer input key class.
* @param inputValueClass
* reducer input value class.
* @param outputKeyClass
* reducer output key class.
* @param outputValueClass
* reducer output value class.
* @param reducerConf
* a configuration for the Reducer class. It is recommended to use a
* Configuration without default values using the
* <code>Configuration(boolean loadDefaults)</code> constructor with
* FALSE.
*/
@SuppressWarnings("unchecked")
protected static void setReducer(Job job, Class<? extends Reducer> klass,
Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass,
Configuration reducerConf) {
String prefix = getPrefix(false);
Configuration jobConf = job.getConfiguration();
checkReducerAlreadySet(false, jobConf, prefix, false);
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
outputValueClass, reducerConf, prefix);
}
protected static void setReducerConf(Configuration jobConf,
Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass,
Configuration reducerConf, String prefix) {
// if the Reducer does not have a Configuration, create an empty one
if (reducerConf == null) {
// using a Configuration without defaults to make it lightweight.
// still the chain's conf may have all defaults and this conf is
// overlapped to the chain's Configuration one.
reducerConf = new Configuration(false);
}
// store the input/output classes of the reducer in
// the reducer configuration
reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
Object.class);
reducerConf
.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
Object.class);
// serialize the reducer configuration in the chain's configuration.
Stringifier<Configuration> stringifier =
new DefaultStringifier<Configuration>(jobConf, Configuration.class);
try {
jobConf.set(prefix + CHAIN_REDUCER_CONFIG, stringifier
.toString(new Configuration(reducerConf)));
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
}
/**
* Setup the chain.
*
* @param jobConf
* chain job's {@link Configuration}.
*/
@SuppressWarnings("unchecked")
void setup(Configuration jobConf) {
String prefix = getPrefix(isMap);
int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
for (int i = 0; i < index; i++) {
Class<? extends Mapper> klass = jobConf.getClass(prefix
+ CHAIN_MAPPER_CLASS + i, null, Mapper.class);
Configuration mConf = getChainElementConf(jobConf, prefix
+ CHAIN_MAPPER_CONFIG + i);
confList.add(mConf);
Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
mappers.add(mapper);
}
Class<? extends Reducer> klass = jobConf.getClass(prefix
+ CHAIN_REDUCER_CLASS, null, Reducer.class);
if (klass != null) {
rConf = getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
reducer = ReflectionUtils.newInstance(klass, rConf);
}
}
@SuppressWarnings("unchecked")
List<Mapper> getAllMappers() {
return mappers;
}
/**
* Returns the Reducer instance in the chain.
*
* @return the Reducer instance in the chain or NULL if none.
*/
Reducer<?, ?, ?, ?> getReducer() {
return reducer;
}
/**
* Creates a ChainBlockingQueue with KeyValuePair as element
*
* @return the ChainBlockingQueue
*/
ChainBlockingQueue<KeyValuePair<?, ?>> createBlockingQueue() {
return new ChainBlockingQueue<KeyValuePair<?, ?>>();
}
/**
* A blocking queue with one element.
*
* @param <E>
*/
class ChainBlockingQueue<E> {
E element = null;
boolean isInterrupted = false;
ChainBlockingQueue() {
blockingQueues.add(this);
}
synchronized void enqueue(E e) throws InterruptedException {
while (element != null) {
if (isInterrupted) {
throw new InterruptedException();
}
this.wait();
}
element = e;
this.notify();
}
synchronized E dequeue() throws InterruptedException {
while (element == null) {
if (isInterrupted) {
throw new InterruptedException();
}
this.wait();
}
E e = element;
element = null;
this.notify();
return e;
}
synchronized void interrupt() {
isInterrupted = true;
this.notifyAll();
}
}
}