MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with fallback. (Todd Lipcon via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1619492 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5cc965a..cd4d6a5 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -187,6 +187,9 @@
MAPREDUCE-5906. Inconsistent configuration in property
"mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw)
+ MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with
+ fallback. (Todd Lipcon via kasha)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index b533ebe..dfcbe09 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -381,16 +381,35 @@
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
- MapOutputCollector<KEY, VALUE> collector
- = (MapOutputCollector<KEY, VALUE>)
- ReflectionUtils.newInstance(
- job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
- MapOutputBuffer.class, MapOutputCollector.class), job);
- LOG.info("Map output collector class = " + collector.getClass().getName());
MapOutputCollector.Context context =
- new MapOutputCollector.Context(this, job, reporter);
- collector.init(context);
- return collector;
+ new MapOutputCollector.Context(this, job, reporter);
+
+ Class<?>[] collectorClasses = job.getClasses(
+ JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
+ int remainingCollectors = collectorClasses.length;
+ for (Class clazz : collectorClasses) {
+ try {
+ if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
+ throw new IOException("Invalid output collector class: " + clazz.getName() +
+ " (does not implement MapOutputCollector)");
+ }
+ Class<? extends MapOutputCollector> subclazz =
+ clazz.asSubclass(MapOutputCollector.class);
+ LOG.debug("Trying map output collector class: " + subclazz.getName());
+ MapOutputCollector<KEY, VALUE> collector =
+ ReflectionUtils.newInstance(subclazz, job);
+ collector.init(context);
+ LOG.info("Map output collector class = " + collector.getClass().getName());
+ return collector;
+ } catch (Exception e) {
+ String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
+ if (--remainingCollectors > 0) {
+ msg += " (" + remainingCollectors + " more collector(s) to try)";
+ }
+ LOG.warn(msg, e);
+ }
+ }
+ throw new IOException("Unable to initialize any output collector");
}
@SuppressWarnings("unchecked")
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 110e316..b2503c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -408,7 +408,9 @@
<name>mapreduce.job.map.output.collector.class</name>
<value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
<description>
- It defines the MapOutputCollector implementation to use.
+ The MapOutputCollector implementation(s) to use. This may be a comma-separated
+ list of class names, in which case the map task will try to initialize each
+ of the collectors in turn. The first to successfully initialize will be used.
</description>
</property>
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
index 1b06ca9..06d8022 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
@@ -71,11 +71,16 @@
*--------------------------------------+---------------------+-----------------+
| <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> | <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>> | The <<<ShuffleConsumerPlugin>>> implementation to use |
*--------------------------------------+---------------------+-----------------+
-| <<<mapreduce.job.map.output.collector.class>>> | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation to use |
+| <<<mapreduce.job.map.output.collector.class>>> | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation(s) to use |
*--------------------------------------+---------------------+-----------------+
These properties can also be set in the <<<mapred-site.xml>>> to change the default values for all jobs.
+ The collector class configuration may specify a comma-separated list of collector implementations.
+ In this case, the map task will attempt to instantiate each in turn until one of the
+ implementations successfully initializes. This can be useful if a given collector
+ implementation is only compatible with certain types of keys or values, for example.
+
** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes:
*--------------------------------------+---------------------+-----------------+
@@ -91,4 +96,3 @@
<<<yarn.nodemanager.aux-services>>> property, for example <<<mapred.shufflex>>>.
Then the property defining the corresponding class must be
<<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>.
-
\ No newline at end of file