MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with fallback. (Todd Lipcon via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1619492 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5cc965a..cd4d6a5 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -187,6 +187,9 @@
     MAPREDUCE-5906. Inconsistent configuration in property
       "mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw)
 
+    MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with 
+    fallback. (Todd Lipcon via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index b533ebe..dfcbe09 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -381,16 +381,35 @@
   private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
           createSortingCollector(JobConf job, TaskReporter reporter)
     throws IOException, ClassNotFoundException {
-    MapOutputCollector<KEY, VALUE> collector
-      = (MapOutputCollector<KEY, VALUE>)
-       ReflectionUtils.newInstance(
-                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
-                        MapOutputBuffer.class, MapOutputCollector.class), job);
-    LOG.info("Map output collector class = " + collector.getClass().getName());
     MapOutputCollector.Context context =
-                           new MapOutputCollector.Context(this, job, reporter);
-    collector.init(context);
-    return collector;
+      new MapOutputCollector.Context(this, job, reporter);
+
+    Class<?>[] collectorClasses = job.getClasses(
+      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
+    int remainingCollectors = collectorClasses.length;
+    for (Class clazz : collectorClasses) {
+      try {
+        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
+          throw new IOException("Invalid output collector class: " + clazz.getName() +
+            " (does not implement MapOutputCollector)");
+        }
+        Class<? extends MapOutputCollector> subclazz =
+          clazz.asSubclass(MapOutputCollector.class);
+        LOG.debug("Trying map output collector class: " + subclazz.getName());
+        MapOutputCollector<KEY, VALUE> collector =
+          ReflectionUtils.newInstance(subclazz, job);
+        collector.init(context);
+        LOG.info("Map output collector class = " + collector.getClass().getName());
+        return collector;
+      } catch (Exception e) {
+        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
+        if (--remainingCollectors > 0) {
+          msg += " (" + remainingCollectors + " more collector(s) to try)";
+        }
+        LOG.warn(msg, e);
+      }
+    }
+    throw new IOException("Unable to initialize any output collector");
   }
 
   @SuppressWarnings("unchecked")
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 110e316..b2503c7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -408,7 +408,9 @@
   <name>mapreduce.job.map.output.collector.class</name>
   <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
   <description>
-    It defines the MapOutputCollector implementation to use.
+    The MapOutputCollector implementation(s) to use. This may be a comma-separated
+    list of class names, in which case the map task will try to initialize each
+    of the collectors in turn. The first to successfully initialize will be used.
   </description>
 </property>
  
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
index 1b06ca9..06d8022 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
@@ -71,11 +71,16 @@
 *--------------------------------------+---------------------+-----------------+
 | <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> | <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>>         | The <<<ShuffleConsumerPlugin>>> implementation to use |
 *--------------------------------------+---------------------+-----------------+
-| <<<mapreduce.job.map.output.collector.class>>>   | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation to use |
+| <<<mapreduce.job.map.output.collector.class>>>   | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation(s) to use |
 *--------------------------------------+---------------------+-----------------+
 
   These properties can also be set in the <<<mapred-site.xml>>> to change the default values for all jobs.
 
+  The collector class configuration may specify a comma-separated list of collector implementations.
+  In this case, the map task will attempt to instantiate each in turn until one of the
+  implementations successfully initializes. This can be useful if a given collector
+  implementation is only compatible with certain types of keys or values, for example.
+
 ** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes:
 
 *--------------------------------------+---------------------+-----------------+
@@ -91,4 +96,3 @@
   <<<yarn.nodemanager.aux-services>>> property, for example <<<mapred.shufflex>>>.
   Then the property defining the corresponding class must be
   <<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>.
-  
\ No newline at end of file