| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred.lib; |
| |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.Stringifier; |
| import org.apache.hadoop.io.DefaultStringifier; |
| import org.apache.hadoop.io.serializer.Deserializer; |
| import org.apache.hadoop.io.serializer.Serialization; |
| import org.apache.hadoop.io.serializer.SerializationFactory; |
| import org.apache.hadoop.io.serializer.Serializer; |
| import org.apache.hadoop.mapred.*; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.GenericsUtil; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| |
| |
| /** |
| * The Chain class provides all the common functionality for the |
| * {@link ChainMapper} and the {@link ChainReducer} classes. |
| */ |
| class Chain { |
| private static final String CHAIN_MAPPER = "chain.mapper"; |
| private static final String CHAIN_REDUCER = "chain.reducer"; |
| |
| private static final String CHAIN_MAPPER_SIZE = ".size"; |
| private static final String CHAIN_MAPPER_CLASS = ".mapper.class."; |
| private static final String CHAIN_MAPPER_CONFIG = ".mapper.config."; |
| private static final String CHAIN_REDUCER_CLASS = ".reducer.class"; |
| private static final String CHAIN_REDUCER_CONFIG = ".reducer.config"; |
| |
| private static final String MAPPER_BY_VALUE = "chain.mapper.byValue"; |
| private static final String REDUCER_BY_VALUE = "chain.reducer.byValue"; |
| |
| private static final String MAPPER_INPUT_KEY_CLASS = |
| "chain.mapper.input.key.class"; |
| private static final String MAPPER_INPUT_VALUE_CLASS = |
| "chain.mapper.input.value.class"; |
| private static final String MAPPER_OUTPUT_KEY_CLASS = |
| "chain.mapper.output.key.class"; |
| private static final String MAPPER_OUTPUT_VALUE_CLASS = |
| "chain.mapper.output.value.class"; |
| private static final String REDUCER_INPUT_KEY_CLASS = |
| "chain.reducer.input.key.class"; |
| private static final String REDUCER_INPUT_VALUE_CLASS = |
| "chain.reducer.input.value.class"; |
| private static final String REDUCER_OUTPUT_KEY_CLASS = |
| "chain.reducer.output.key.class"; |
| private static final String REDUCER_OUTPUT_VALUE_CLASS = |
| "chain.reducer.output.value.class"; |
| |
| private boolean isMap; |
| |
| private JobConf chainJobConf; |
| |
| private List<Mapper> mappers = new ArrayList<Mapper>(); |
| private Reducer reducer; |
| |
| // to cache the key/value output class serializations for each chain element |
| // to avoid everytime lookup. |
| private List<Serialization> mappersKeySerialization = |
| new ArrayList<Serialization>(); |
| private List<Serialization> mappersValueSerialization = |
| new ArrayList<Serialization>(); |
| private Serialization reducerKeySerialization; |
| private Serialization reducerValueSerialization; |
| |
| /** |
| * Creates a Chain instance configured for a Mapper or a Reducer. |
| * |
| * @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a |
| * Reducer. |
| */ |
| Chain(boolean isMap) { |
| this.isMap = isMap; |
| } |
| |
| /** |
| * Returns the prefix to use for the configuration of the chain depending |
| * if it is for a Mapper or a Reducer. |
| * |
| * @param isMap TRUE for Mapper, FALSE for Reducer. |
| * @return the prefix to use. |
| */ |
| private static String getPrefix(boolean isMap) { |
| return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER; |
| } |
| |
| /** |
| * Creates a {@link JobConf} for one of the Maps or Reduce in the chain. |
| * <p/> |
| * It creates a new JobConf using the chain job's JobConf as base and adds to |
| * it the configuration properties for the chain element. The keys of the |
| * chain element jobConf have precedence over the given JobConf. |
| * |
| * @param jobConf the chain job's JobConf. |
| * @param confKey the key for chain element configuration serialized in the |
| * chain job's JobConf. |
| * @return a new JobConf aggregating the chain job's JobConf with the chain |
| * element configuration properties. |
| */ |
| private static JobConf getChainElementConf(JobConf jobConf, String confKey) { |
| JobConf conf; |
| try { |
| Stringifier<JobConf> stringifier = |
| new DefaultStringifier<JobConf>(jobConf, JobConf.class); |
| conf = stringifier.fromString(jobConf.get(confKey, null)); |
| } catch (IOException ioex) { |
| throw new RuntimeException(ioex); |
| } |
| // we have to do this because the Writable desearialization clears all |
| // values set in the conf making not possible do do a new JobConf(jobConf) |
| // in the creation of the conf above |
| jobConf = new JobConf(jobConf); |
| |
| for(Map.Entry<String, String> entry : conf) { |
| jobConf.set(entry.getKey(), entry.getValue()); |
| } |
| return jobConf; |
| } |
| |
| /** |
| * Adds a Mapper class to the chain job's JobConf. |
| * <p/> |
| * The configuration properties of the chain job have precedence over the |
| * configuration properties of the Mapper. |
| * |
| * @param isMap indicates if the Chain is for a Mapper or for a |
| * Reducer. |
| * @param jobConf chain job's JobConf to add the Mapper class. |
| * @param klass the Mapper class to add. |
| * @param inputKeyClass mapper input key class. |
| * @param inputValueClass mapper input value class. |
| * @param outputKeyClass mapper output key class. |
| * @param outputValueClass mapper output value class. |
| * @param byValue indicates if key/values should be passed by value |
| * to the next Mapper in the chain, if any. |
| * @param mapperConf a JobConf with the configuration for the Mapper |
| * class. It is recommended to use a JobConf without default values using the |
| * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE. |
| */ |
| public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf, |
| Class<? extends Mapper<K1, V1, K2, V2>> klass, |
| Class<? extends K1> inputKeyClass, |
| Class<? extends V1> inputValueClass, |
| Class<? extends K2> outputKeyClass, |
| Class<? extends V2> outputValueClass, |
| boolean byValue, JobConf mapperConf) { |
| String prefix = getPrefix(isMap); |
| |
| // if a reducer chain check the Reducer has been already set |
| if (!isMap) { |
| if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, |
| Reducer.class) == null) { |
| throw new IllegalStateException( |
| "A Mapper can be added to the chain only after the Reducer has " + |
| "been set"); |
| } |
| } |
| int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0); |
| jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class); |
| |
| // if it is a reducer chain and the first Mapper is being added check the |
| // key and value input classes of the mapper match those of the reducer |
| // output. |
| if (!isMap && index == 0) { |
| JobConf reducerConf = |
| getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG); |
| if (! inputKeyClass.isAssignableFrom( |
| reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) { |
| throw new IllegalArgumentException("The Reducer output key class does" + |
| " not match the Mapper input key class"); |
| } |
| if (! inputValueClass.isAssignableFrom( |
| reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) { |
| throw new IllegalArgumentException("The Reducer output value class" + |
| " does not match the Mapper input value class"); |
| } |
| } else if (index > 0) { |
| // check the that the new Mapper in the chain key and value input classes |
| // match those of the previous Mapper output. |
| JobConf previousMapperConf = |
| getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + |
| (index - 1)); |
| if (! inputKeyClass.isAssignableFrom( |
| previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) { |
| throw new IllegalArgumentException("The Mapper output key class does" + |
| " not match the previous Mapper input key class"); |
| } |
| if (! inputValueClass.isAssignableFrom( |
| previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) { |
| throw new IllegalArgumentException("The Mapper output value class" + |
| " does not match the previous Mapper input value class"); |
| } |
| } |
| |
| // if the Mapper does not have a private JobConf create an empty one |
| if (mapperConf == null) { |
| // using a JobConf without defaults to make it lightweight. |
| // still the chain JobConf may have all defaults and this conf is |
| // overlapped to the chain JobConf one. |
| mapperConf = new JobConf(true); |
| } |
| |
| // store in the private mapper conf the input/output classes of the mapper |
| // and if it works by value or by reference |
| mapperConf.setBoolean(MAPPER_BY_VALUE, byValue); |
| mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class); |
| mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, |
| Object.class); |
| mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); |
| mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass, |
| Object.class); |
| |
| // serialize the private mapper jobconf in the chain jobconf. |
| Stringifier<JobConf> stringifier = |
| new DefaultStringifier<JobConf>(jobConf, JobConf.class); |
| try { |
| jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, |
| stringifier.toString(new JobConf(mapperConf))); |
| } |
| catch (IOException ioEx) { |
| throw new RuntimeException(ioEx); |
| } |
| |
| // increment the chain counter |
| jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1); |
| } |
| |
| /** |
| * Sets the Reducer class to the chain job's JobConf. |
| * <p/> |
| * The configuration properties of the chain job have precedence over the |
| * configuration properties of the Reducer. |
| * |
| * @param jobConf chain job's JobConf to add the Reducer class. |
| * @param klass the Reducer class to add. |
| * @param inputKeyClass reducer input key class. |
| * @param inputValueClass reducer input value class. |
| * @param outputKeyClass reducer output key class. |
| * @param outputValueClass reducer output value class. |
| * @param byValue indicates if key/values should be passed by value |
| * to the next Mapper in the chain, if any. |
| * @param reducerConf a JobConf with the configuration for the Reducer |
| * class. It is recommended to use a JobConf without default values using the |
| * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE. |
| */ |
| public static <K1, V1, K2, V2> void setReducer(JobConf jobConf, |
| Class<? extends Reducer<K1, V1, K2, V2>> klass, |
| Class<? extends K1> inputKeyClass, |
| Class<? extends V1> inputValueClass, |
| Class<? extends K2> outputKeyClass, |
| Class<? extends V2> outputValueClass, |
| boolean byValue, JobConf reducerConf) { |
| String prefix = getPrefix(false); |
| |
| if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) { |
| throw new IllegalStateException("Reducer has been already set"); |
| } |
| |
| jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class); |
| |
| // if the Reducer does not have a private JobConf create an empty one |
| if (reducerConf == null) { |
| // using a JobConf without defaults to make it lightweight. |
| // still the chain JobConf may have all defaults and this conf is |
| // overlapped to the chain JobConf one. |
| reducerConf = new JobConf(false); |
| } |
| |
| // store in the private reducer conf the input/output classes of the reducer |
| // and if it works by value or by reference |
| reducerConf.setBoolean(MAPPER_BY_VALUE, byValue); |
| reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class); |
| reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass, |
| Object.class); |
| reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, |
| Object.class); |
| reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass, |
| Object.class); |
| |
| // serialize the private mapper jobconf in the chain jobconf. |
| Stringifier<JobConf> stringifier = |
| new DefaultStringifier<JobConf>(jobConf, JobConf.class); |
| try { |
| jobConf.set(prefix + CHAIN_REDUCER_CONFIG, |
| stringifier.toString(new JobConf(reducerConf))); |
| } |
| catch (IOException ioEx) { |
| throw new RuntimeException(ioEx); |
| } |
| } |
| |
| /** |
| * Configures all the chain elements for the task. |
| * |
| * @param jobConf chain job's JobConf. |
| */ |
| public void configure(JobConf jobConf) { |
| String prefix = getPrefix(isMap); |
| chainJobConf = jobConf; |
| SerializationFactory serializationFactory = |
| new SerializationFactory(chainJobConf); |
| int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0); |
| for (int i = 0; i < index; i++) { |
| Class<? extends Mapper> klass = |
| jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class); |
| JobConf mConf = |
| getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i); |
| Mapper mapper = ReflectionUtils.newInstance(klass, mConf); |
| mappers.add(mapper); |
| |
| if (mConf.getBoolean(MAPPER_BY_VALUE, true)) { |
| mappersKeySerialization.add(serializationFactory.getSerialization( |
| mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))); |
| mappersValueSerialization.add(serializationFactory.getSerialization( |
| mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))); |
| } else { |
| mappersKeySerialization.add(null); |
| mappersValueSerialization.add(null); |
| } |
| } |
| Class<? extends Reducer> klass = |
| jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class); |
| if (klass != null) { |
| JobConf rConf = |
| getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG); |
| reducer = ReflectionUtils.newInstance(klass, rConf); |
| if (rConf.getBoolean(REDUCER_BY_VALUE, true)) { |
| reducerKeySerialization = serializationFactory |
| .getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null)); |
| reducerValueSerialization = serializationFactory |
| .getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null)); |
| } else { |
| reducerKeySerialization = null; |
| reducerValueSerialization = null; |
| } |
| } |
| } |
| |
| /** |
| * Returns the chain job conf. |
| * |
| * @return the chain job conf. |
| */ |
| protected JobConf getChainJobConf() { |
| return chainJobConf; |
| } |
| |
| /** |
| * Returns the first Mapper instance in the chain. |
| * |
| * @return the first Mapper instance in the chain or NULL if none. |
| */ |
| public Mapper getFirstMap() { |
| return (mappers.size() > 0) ? mappers.get(0) : null; |
| } |
| |
| /** |
| * Returns the Reducer instance in the chain. |
| * |
| * @return the Reducer instance in the chain or NULL if none. |
| */ |
| public Reducer getReducer() { |
| return reducer; |
| } |
| |
| /** |
| * Returns the OutputCollector to be used by a Mapper instance in the chain. |
| * |
| * @param mapperIndex index of the Mapper instance to get the OutputCollector. |
| * @param output the original OutputCollector of the task. |
| * @param reporter the reporter of the task. |
| * @return the OutputCollector to be used in the chain. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| public OutputCollector getMapperCollector(int mapperIndex, |
| OutputCollector output, |
| Reporter reporter) { |
| Serialization keySerialization = mappersKeySerialization.get(mapperIndex); |
| Serialization valueSerialization = |
| mappersValueSerialization.get(mapperIndex); |
| return new ChainOutputCollector(mapperIndex, keySerialization, |
| valueSerialization, output, reporter); |
| } |
| |
| /** |
| * Returns the OutputCollector to be used by a Mapper instance in the chain. |
| * |
| * @param output the original OutputCollector of the task. |
| * @param reporter the reporter of the task. |
| * @return the OutputCollector to be used in the chain. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| public OutputCollector getReducerCollector(OutputCollector output, |
| Reporter reporter) { |
| return new ChainOutputCollector(reducerKeySerialization, |
| reducerValueSerialization, output, |
| reporter); |
| } |
| |
| /** |
| * Closes all the chain elements. |
| * |
| * @throws IOException thrown if any of the chain elements threw an |
| * IOException exception. |
| */ |
| public void close() throws IOException { |
| for (Mapper map : mappers) { |
| map.close(); |
| } |
| if (reducer != null) { |
| reducer.close(); |
| } |
| } |
| |
| // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser |
| // it has to be a thread local because if not it would break if used from a |
| // MultiThreadedMapRunner. |
| private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer = |
| new ThreadLocal<DataOutputBuffer>() { |
| protected DataOutputBuffer initialValue() { |
| return new DataOutputBuffer(1024); |
| } |
| }; |
| |
| /** |
| * OutputCollector implementation used by the chain tasks. |
| * <p/> |
| * If it is not the end of the chain, a {@link #collect} invocation invokes |
| * the next Mapper in the chain. If it is the end of the chain the task |
| * OutputCollector is called. |
| */ |
| private class ChainOutputCollector<K, V> implements OutputCollector<K, V> { |
| private int nextMapperIndex; |
| private Serialization<K> keySerialization; |
| private Serialization<V> valueSerialization; |
| private OutputCollector output; |
| private Reporter reporter; |
| |
| /* |
| * Constructor for Mappers |
| */ |
| public ChainOutputCollector(int index, Serialization<K> keySerialization, |
| Serialization<V> valueSerialization, |
| OutputCollector output, Reporter reporter) { |
| this.nextMapperIndex = index + 1; |
| this.keySerialization = keySerialization; |
| this.valueSerialization = valueSerialization; |
| this.output = output; |
| this.reporter = reporter; |
| } |
| |
| /* |
| * Constructor for Reducer |
| */ |
| public ChainOutputCollector(Serialization<K> keySerialization, |
| Serialization<V> valueSerialization, |
| OutputCollector output, Reporter reporter) { |
| this.nextMapperIndex = 0; |
| this.keySerialization = keySerialization; |
| this.valueSerialization = valueSerialization; |
| this.output = output; |
| this.reporter = reporter; |
| } |
| |
| @SuppressWarnings({"unchecked"}) |
| public void collect(K key, V value) throws IOException { |
| if (nextMapperIndex < mappers.size()) { |
| // there is a next mapper in chain |
| |
| // only need to ser/deser if there is next mapper in the chain |
| if (keySerialization != null) { |
| key = makeCopyForPassByValue(keySerialization, key); |
| value = makeCopyForPassByValue(valueSerialization, value); |
| } |
| |
| // gets ser/deser and mapper of next in chain |
| Serialization nextKeySerialization = |
| mappersKeySerialization.get(nextMapperIndex); |
| Serialization nextValueSerialization = |
| mappersValueSerialization.get(nextMapperIndex); |
| Mapper nextMapper = mappers.get(nextMapperIndex); |
| |
| // invokes next mapper in chain |
| nextMapper.map(key, value, |
| new ChainOutputCollector(nextMapperIndex, |
| nextKeySerialization, |
| nextValueSerialization, |
| output, reporter), |
| reporter); |
| } else { |
| // end of chain, user real output collector |
| output.collect(key, value); |
| } |
| } |
| |
| private <E> E makeCopyForPassByValue(Serialization<E> serialization, |
| E obj) throws IOException { |
| Serializer<E> ser = |
| serialization.getSerializer(GenericsUtil.getClass(obj)); |
| Deserializer<E> deser = |
| serialization.getDeserializer(GenericsUtil.getClass(obj)); |
| |
| DataOutputBuffer dof = threadLocalDataOutputBuffer.get(); |
| |
| dof.reset(); |
| ser.open(dof); |
| ser.serialize(obj); |
| ser.close(); |
| obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj), |
| getChainJobConf()); |
| ByteArrayInputStream bais = |
| new ByteArrayInputStream(dof.getData(), 0, dof.getLength()); |
| deser.open(bais); |
| deser.deserialize(obj); |
| deser.close(); |
| return obj; |
| } |
| |
| } |
| |
| } |