blob: c14e9ad3d7b0e2bfd3eca0053d165cb43f4fe695 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.GenericsUtil;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* The Chain class provides all the common functionality for the
* {@link ChainMapper} and the {@link ChainReducer} classes.
* @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
*/
@Deprecated
class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
private JobConf chainJobConf;
private List<Mapper> mappers = new ArrayList<Mapper>();
private Reducer reducer;
// to cache the key/value output class serializations for each chain element
// to avoid everytime lookup.
private List<Serialization> mappersKeySerialization =
new ArrayList<Serialization>();
private List<Serialization> mappersValueSerialization =
new ArrayList<Serialization>();
private Serialization reducerKeySerialization;
private Serialization reducerValueSerialization;
/**
* Creates a Chain instance configured for a Mapper or a Reducer.
*
* @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a
* Reducer.
*/
Chain(boolean isMap) {
super(isMap);
}
/**
* Adds a Mapper class to the chain job's JobConf.
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Mapper.
*
* @param isMap indicates if the Chain is for a Mapper or for a
* Reducer.
* @param jobConf chain job's JobConf to add the Mapper class.
* @param klass the Mapper class to add.
* @param inputKeyClass mapper input key class.
* @param inputValueClass mapper input value class.
* @param outputKeyClass mapper output key class.
* @param outputValueClass mapper output value class.
* @param byValue indicates if key/values should be passed by value
* to the next Mapper in the chain, if any.
* @param mapperConf a JobConf with the configuration for the Mapper
* class. It is recommended to use a JobConf without default values using the
* <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
*/
public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf,
Class<? extends Mapper<K1, V1, K2, V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue, JobConf mapperConf) {
String prefix = getPrefix(isMap);
// if a reducer chain check the Reducer has been already set
checkReducerAlreadySet(isMap, jobConf, prefix, true);
// set the mapper class
int index = getIndex(jobConf, prefix);
jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
outputKeyClass, outputValueClass, index, prefix);
// if the Mapper does not have a private JobConf create an empty one
if (mapperConf == null) {
// using a JobConf without defaults to make it lightweight.
// still the chain JobConf may have all defaults and this conf is
// overlapped to the chain JobConf one.
mapperConf = new JobConf(true);
}
// store in the private mapper conf if it works by value or by reference
mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
outputKeyClass, outputValueClass, mapperConf, index, prefix);
}
/**
* Sets the Reducer class to the chain job's JobConf.
* <p/>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param jobConf chain job's JobConf to add the Reducer class.
* @param klass the Reducer class to add.
* @param inputKeyClass reducer input key class.
* @param inputValueClass reducer input value class.
* @param outputKeyClass reducer output key class.
* @param outputValueClass reducer output value class.
* @param byValue indicates if key/values should be passed by value
* to the next Mapper in the chain, if any.
* @param reducerConf a JobConf with the configuration for the Reducer
* class. It is recommended to use a JobConf without default values using the
* <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
*/
public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
Class<? extends Reducer<K1, V1, K2, V2>> klass,
Class<? extends K1> inputKeyClass,
Class<? extends V1> inputValueClass,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass,
boolean byValue, JobConf reducerConf) {
String prefix = getPrefix(false);
checkReducerAlreadySet(false, jobConf, prefix, false);
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
// if the Reducer does not have a private JobConf create an empty one
if (reducerConf == null) {
// using a JobConf without defaults to make it lightweight.
// still the chain JobConf may have all defaults and this conf is
// overlapped to the chain JobConf one.
reducerConf = new JobConf(false);
}
// store in the private reducer conf the input/output classes of the reducer
// and if it works by value or by reference
reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
outputValueClass, reducerConf, prefix);
}
/**
* Configures all the chain elements for the task.
*
* @param jobConf chain job's JobConf.
*/
public void configure(JobConf jobConf) {
String prefix = getPrefix(isMap);
chainJobConf = jobConf;
SerializationFactory serializationFactory =
new SerializationFactory(chainJobConf);
int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
for (int i = 0; i < index; i++) {
Class<? extends Mapper> klass =
jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
JobConf mConf = new JobConf(
getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
mappers.add(mapper);
if (mConf.getBoolean(MAPPER_BY_VALUE, true)) {
mappersKeySerialization.add(serializationFactory.getSerialization(
mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null)));
mappersValueSerialization.add(serializationFactory.getSerialization(
mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null)));
} else {
mappersKeySerialization.add(null);
mappersValueSerialization.add(null);
}
}
Class<? extends Reducer> klass =
jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
if (klass != null) {
JobConf rConf = new JobConf(
getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
reducer = ReflectionUtils.newInstance(klass, rConf);
if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
reducerKeySerialization = serializationFactory
.getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null));
reducerValueSerialization = serializationFactory
.getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null));
} else {
reducerKeySerialization = null;
reducerValueSerialization = null;
}
}
}
/**
* Returns the chain job conf.
*
* @return the chain job conf.
*/
protected JobConf getChainJobConf() {
return chainJobConf;
}
/**
* Returns the first Mapper instance in the chain.
*
* @return the first Mapper instance in the chain or NULL if none.
*/
public Mapper getFirstMap() {
return (mappers.size() > 0) ? mappers.get(0) : null;
}
/**
* Returns the Reducer instance in the chain.
*
* @return the Reducer instance in the chain or NULL if none.
*/
public Reducer getReducer() {
return reducer;
}
/**
* Returns the OutputCollector to be used by a Mapper instance in the chain.
*
* @param mapperIndex index of the Mapper instance to get the OutputCollector.
* @param output the original OutputCollector of the task.
* @param reporter the reporter of the task.
* @return the OutputCollector to be used in the chain.
*/
@SuppressWarnings({"unchecked"})
public OutputCollector getMapperCollector(int mapperIndex,
OutputCollector output,
Reporter reporter) {
Serialization keySerialization = mappersKeySerialization.get(mapperIndex);
Serialization valueSerialization =
mappersValueSerialization.get(mapperIndex);
return new ChainOutputCollector(mapperIndex, keySerialization,
valueSerialization, output, reporter);
}
/**
* Returns the OutputCollector to be used by a Mapper instance in the chain.
*
* @param output the original OutputCollector of the task.
* @param reporter the reporter of the task.
* @return the OutputCollector to be used in the chain.
*/
@SuppressWarnings({"unchecked"})
public OutputCollector getReducerCollector(OutputCollector output,
Reporter reporter) {
return new ChainOutputCollector(reducerKeySerialization,
reducerValueSerialization, output,
reporter);
}
/**
* Closes all the chain elements.
*
* @throws IOException thrown if any of the chain elements threw an
* IOException exception.
*/
public void close() throws IOException {
for (Mapper map : mappers) {
map.close();
}
if (reducer != null) {
reducer.close();
}
}
// using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
// it has to be a thread local because if not it would break if used from a
// MultiThreadedMapRunner.
private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
new ThreadLocal<DataOutputBuffer>() {
protected DataOutputBuffer initialValue() {
return new DataOutputBuffer(1024);
}
};
/**
* OutputCollector implementation used by the chain tasks.
* <p/>
* If it is not the end of the chain, a {@link #collect} invocation invokes
* the next Mapper in the chain. If it is the end of the chain the task
* OutputCollector is called.
*/
private class ChainOutputCollector<K, V> implements OutputCollector<K, V> {
private int nextMapperIndex;
private Serialization<K> keySerialization;
private Serialization<V> valueSerialization;
private OutputCollector output;
private Reporter reporter;
/*
* Constructor for Mappers
*/
public ChainOutputCollector(int index, Serialization<K> keySerialization,
Serialization<V> valueSerialization,
OutputCollector output, Reporter reporter) {
this.nextMapperIndex = index + 1;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
this.output = output;
this.reporter = reporter;
}
/*
* Constructor for Reducer
*/
public ChainOutputCollector(Serialization<K> keySerialization,
Serialization<V> valueSerialization,
OutputCollector output, Reporter reporter) {
this.nextMapperIndex = 0;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
this.output = output;
this.reporter = reporter;
}
@SuppressWarnings({"unchecked"})
public void collect(K key, V value) throws IOException {
if (nextMapperIndex < mappers.size()) {
// there is a next mapper in chain
// only need to ser/deser if there is next mapper in the chain
if (keySerialization != null) {
key = makeCopyForPassByValue(keySerialization, key);
value = makeCopyForPassByValue(valueSerialization, value);
}
// gets ser/deser and mapper of next in chain
Serialization nextKeySerialization =
mappersKeySerialization.get(nextMapperIndex);
Serialization nextValueSerialization =
mappersValueSerialization.get(nextMapperIndex);
Mapper nextMapper = mappers.get(nextMapperIndex);
// invokes next mapper in chain
nextMapper.map(key, value,
new ChainOutputCollector(nextMapperIndex,
nextKeySerialization,
nextValueSerialization,
output, reporter),
reporter);
} else {
// end of chain, user real output collector
output.collect(key, value);
}
}
private <E> E makeCopyForPassByValue(Serialization<E> serialization,
E obj) throws IOException {
Serializer<E> ser =
serialization.getSerializer(GenericsUtil.getClass(obj));
Deserializer<E> deser =
serialization.getDeserializer(GenericsUtil.getClass(obj));
DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
dof.reset();
ser.open(dof);
ser.serialize(obj);
ser.close();
obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
getChainJobConf());
ByteArrayInputStream bais =
new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
deser.open(bais);
deser.deserialize(obj);
deser.close();
return obj;
}
}
}