Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2454@1418161 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index efd23cd..7621a6e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -1,5 +1,12 @@
Hadoop MapReduce Change Log
+Branch MR-2454
+
+ MAPREDUCE-4809. Make internal classes required for MAPREDUCE-2454 to be
+ java public. (masokan via tucu)
+
+ MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
+
Trunk (Unreleased)
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
new file mode 100644
index 0000000..3996534
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexRecord.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class IndexRecord {
+ public long startOffset;
+ public long rawLength;
+ public long partLength;
+
+ public IndexRecord() { }
+
+ public IndexRecord(long startOffset, long rawLength, long partLength) {
+ this.startOffset = startOffset;
+ this.rawLength = rawLength;
+ this.partLength = partLength;
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
new file mode 100644
index 0000000..368c016
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapOutputCollector.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public interface MapOutputCollector<K, V> {
+ public void init(Context context
+ ) throws IOException, ClassNotFoundException;
+ public void collect(K key, V value, int partition
+ ) throws IOException, InterruptedException;
+ public void close() throws IOException, InterruptedException;
+
+ public void flush() throws IOException, InterruptedException,
+ ClassNotFoundException;
+
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
+ @InterfaceStability.Unstable
+ public static class Context {
+ private final MapTask mapTask;
+ private final JobConf jobConf;
+ private final TaskReporter reporter;
+
+ public Context(MapTask mapTask, JobConf jobConf, TaskReporter reporter) {
+ this.mapTask = mapTask;
+ this.jobConf = jobConf;
+ this.reporter = reporter;
+ }
+
+ public MapTask getMapTask() {
+ return mapTask;
+ }
+
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+
+ public TaskReporter getReporter() {
+ return reporter;
+ }
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 16fb4d2..cd4ba51 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -34,6 +34,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +56,7 @@
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
@@ -71,7 +74,9 @@
import org.apache.hadoop.util.StringUtils;
/** A Map task. */
-class MapTask extends Task {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapTask extends Task {
/**
* The size of each record in the index file for the map-outputs.
*/
@@ -338,6 +343,10 @@
done(umbilical, reporter);
}
+ public Progress getSortPhase() {
+ return sortPhase;
+ }
+
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
@@ -367,6 +376,22 @@
}
@SuppressWarnings("unchecked")
+ private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
+ createSortingCollector(JobConf job, TaskReporter reporter)
+ throws IOException, ClassNotFoundException {
+ MapOutputCollector<KEY, VALUE> collector
+ = (MapOutputCollector<KEY, VALUE>)
+ ReflectionUtils.newInstance(
+ job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+ MapOutputBuffer.class, MapOutputCollector.class), job);
+ LOG.info("Map output collector class = " + collector.getClass().getName());
+ MapOutputCollector.Context context =
+ new MapOutputCollector.Context(this, job, reporter);
+ collector.init(context);
+ return collector;
+ }
+
+ @SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runOldMapper(final JobConf job,
final TaskSplitIndex splitIndex,
@@ -388,11 +413,14 @@
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
- MapOutputCollector collector = null;
+ MapOutputCollector<OUTKEY, OUTVALUE> collector = null;
if (numReduceTasks > 0) {
- collector = new MapOutputBuffer(umbilical, job, reporter);
+ collector = createSortingCollector(job, reporter);
} else {
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
+ collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>();
+ MapOutputCollector.Context context =
+ new MapOutputCollector.Context(this, job, reporter);
+ collector.init(context);
}
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
@@ -638,7 +666,7 @@
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
- collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
+ collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
@@ -734,17 +762,6 @@
output.close(mapperContext);
}
- interface MapOutputCollector<K, V> {
-
- public void collect(K key, V value, int partition
- ) throws IOException, InterruptedException;
- public void close() throws IOException, InterruptedException;
-
- public void flush() throws IOException, InterruptedException,
- ClassNotFoundException;
-
- }
-
class DirectMapOutputCollector<K, V>
implements MapOutputCollector<K, V> {
@@ -752,14 +769,18 @@
private TaskReporter reporter = null;
- private final Counters.Counter mapOutputRecordCounter;
- private final Counters.Counter fileOutputByteCounter;
- private final List<Statistics> fsStats;
+ private Counters.Counter mapOutputRecordCounter;
+ private Counters.Counter fileOutputByteCounter;
+ private List<Statistics> fsStats;
+
+ public DirectMapOutputCollector() {
+ }
@SuppressWarnings("unchecked")
- public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
- JobConf job, TaskReporter reporter) throws IOException {
- this.reporter = reporter;
+ public void init(MapOutputCollector.Context context
+ ) throws IOException, ClassNotFoundException {
+ this.reporter = context.getReporter();
+ JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
@@ -814,25 +835,27 @@
}
}
- private class MapOutputBuffer<K extends Object, V extends Object>
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
+ @InterfaceStability.Unstable
+ public static class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
- final int partitions;
- final JobConf job;
- final TaskReporter reporter;
- final Class<K> keyClass;
- final Class<V> valClass;
- final RawComparator<K> comparator;
- final SerializationFactory serializationFactory;
- final Serializer<K> keySerializer;
- final Serializer<V> valSerializer;
- final CombinerRunner<K,V> combinerRunner;
- final CombineOutputCollector<K, V> combineCollector;
+ private int partitions;
+ private JobConf job;
+ private TaskReporter reporter;
+ private Class<K> keyClass;
+ private Class<V> valClass;
+ private RawComparator<K> comparator;
+ private SerializationFactory serializationFactory;
+ private Serializer<K> keySerializer;
+ private Serializer<V> valSerializer;
+ private CombinerRunner<K,V> combinerRunner;
+ private CombineOutputCollector<K, V> combineCollector;
// Compression for map-outputs
- final CompressionCodec codec;
+ private CompressionCodec codec;
// k/v accounting
- final IntBuffer kvmeta; // metadata overlay on backing store
+ private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart; // marks origin of spill metadata
int kvend; // marks end of spill metadata
int kvindex; // marks end of fully serialized records
@@ -856,15 +879,15 @@
private static final int METASIZE = NMETA * 4; // size in bytes
// spill accounting
- final int maxRec;
- final int softLimit;
+ private int maxRec;
+ private int softLimit;
boolean spillInProgress;;
int bufferRemaining;
volatile Throwable sortSpillException = null;
int numSpills = 0;
- final int minSpillsForCombine;
- final IndexedSorter sorter;
+ private int minSpillsForCombine;
+ private IndexedSorter sorter;
final ReentrantLock spillLock = new ReentrantLock();
final Condition spillDone = spillLock.newCondition();
final Condition spillReady = spillLock.newCondition();
@@ -872,12 +895,12 @@
volatile boolean spillThreadRunning = false;
final SpillThread spillThread = new SpillThread();
- final FileSystem rfs;
+ private FileSystem rfs;
// Counters
- final Counters.Counter mapOutputByteCounter;
- final Counters.Counter mapOutputRecordCounter;
- final Counters.Counter fileOutputByteCounter;
+ private Counters.Counter mapOutputByteCounter;
+ private Counters.Counter mapOutputRecordCounter;
+ private Counters.Counter fileOutputByteCounter;
final ArrayList<SpillRecord> indexCacheList =
new ArrayList<SpillRecord>();
@@ -885,12 +908,23 @@
private int indexCacheMemoryLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
+ private MapTask mapTask;
+ private MapOutputFile mapOutputFile;
+ private Progress sortPhase;
+ private Counters.Counter spilledRecordsCounter;
+
+ public MapOutputBuffer() {
+ }
+
@SuppressWarnings("unchecked")
- public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException {
- this.job = job;
- this.reporter = reporter;
+ public void init(MapOutputCollector.Context context
+ ) throws IOException, ClassNotFoundException {
+ job = context.getJobConf();
+ reporter = context.getReporter();
+ mapTask = context.getMapTask();
+ mapOutputFile = mapTask.getMapOutputFile();
+ sortPhase = mapTask.getSortPhase();
+ spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
@@ -967,7 +1001,7 @@
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, conf);
+ combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
@@ -1118,6 +1152,10 @@
}
}
+ private TaskAttemptID getTaskID() {
+ return mapTask.getTaskID();
+ }
+
/**
* Set the point from which meta and serialization data expand. The meta
* indices are aligned with the buffer, so metadata never spans the ends of
@@ -1490,7 +1528,7 @@
if (lspillException instanceof Error) {
final String logMsg = "Task " + getTaskID() + " failed : " +
StringUtils.stringifyException(lspillException);
- reportFatalError(getTaskID(), lspillException, logMsg);
+ mapTask.reportFatalError(getTaskID(), lspillException, logMsg);
}
throw new IOException("Spill failed", lspillException);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
index dc67335..93a2d04 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
@@ -26,6 +26,8 @@
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -34,7 +36,9 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
-class SpillRecord {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class SpillRecord {
/** Backing store */
private final ByteBuffer buf;
@@ -143,17 +147,3 @@
}
}
-
-class IndexRecord {
- long startOffset;
- long rawLength;
- long partLength;
-
- public IndexRecord() { }
-
- public IndexRecord(long startOffset, long rawLength, long partLength) {
- this.startOffset = startOffset;
- this.rawLength = rawLength;
- this.partLength = partLength;
- }
-}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 4034029..ce53da8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -584,9 +584,9 @@
return status;
}
- @InterfaceAudience.Private
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
- protected class TaskReporter
+ public class TaskReporter
extends org.apache.hadoop.mapreduce.StatusReporter
implements Runnable, Reporter {
private TaskUmbilicalProtocol umbilical;
@@ -1466,9 +1466,9 @@
return reducerContext;
}
- @InterfaceAudience.Private
+ @InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
- protected static abstract class CombinerRunner<K,V> {
+ public static abstract class CombinerRunner<K,V> {
protected final Counters.Counter inputCounter;
protected final JobConf job;
protected final TaskReporter reporter;
@@ -1486,13 +1486,13 @@
* @param iterator the key/value pairs to use as input
* @param collector the output collector
*/
- abstract void combine(RawKeyValueIterator iterator,
+ public abstract void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector
) throws IOException, InterruptedException,
ClassNotFoundException;
@SuppressWarnings("unchecked")
- static <K,V>
+ public static <K,V>
CombinerRunner<K,V> create(JobConf job,
TaskAttemptID taskId,
Counters.Counter inputCounter,
@@ -1542,7 +1542,7 @@
}
@SuppressWarnings("unchecked")
- protected void combine(RawKeyValueIterator kvIter,
+ public void combine(RawKeyValueIterator kvIter,
OutputCollector<K,V> combineCollector
) throws IOException {
Reducer<K,V,K,V> combiner =
@@ -1611,7 +1611,7 @@
@SuppressWarnings("unchecked")
@Override
- void combine(RawKeyValueIterator iterator,
+ public void combine(RawKeyValueIterator iterator,
OutputCollector<K,V> collector
) throws IOException, InterruptedException,
ClassNotFoundException {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index b086a12..df15673 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -30,6 +30,9 @@
public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+ public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
+ = "mapreduce.job.map.output.collector.class";
+
public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
index f893cec..34a9a42 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
@@ -17,9 +17,14 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
* An interface for reporting exceptions to other threads
*/
-interface ExceptionReporter {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public interface ExceptionReporter {
void reportException(Throwable t);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
index 7c5e621..935931d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
@@ -20,9 +20,14 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
import org.apache.hadoop.mapreduce.TaskAttemptID;
-class MapHost {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapHost {
public static enum State {
IDLE, // No map outputs available
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
index aab0ccc..fbe7096 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
@@ -24,6 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -33,7 +35,9 @@
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-class MapOutput<K,V> {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MapOutput<K,V> {
private static final Log LOG = LogFactory.getLog(MapOutput.class);
private static AtomicInteger ID = new AtomicInteger(0);
@@ -62,7 +66,7 @@
private final boolean primaryMapOutput;
- MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
+ public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size,
JobConf conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
throws IOException {
@@ -87,7 +91,7 @@
this.primaryMapOutput = primaryMapOutput;
}
- MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size,
+ public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size,
boolean primaryMapOutput) {
this.id = ID.incrementAndGet();
this.mapId = mapId;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
index 29503ce..6412f12 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
@@ -59,7 +59,7 @@
import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings(value={"unchecked", "deprecation"})
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class MergeManager<K, V> {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index e582d28..f6c44e4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -38,7 +38,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
index d327aa4..92c69a6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -25,7 +28,9 @@
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
-class ShuffleClientMetrics implements Updater {
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class ShuffleClientMetrics implements Updater {
private MetricsRecord shuffleMetrics = null;
private int numFailedFetches = 0;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 8533045..9367f3c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -928,4 +928,12 @@
<value>jhs/_HOST@REALM.TLD</value>
</property>
+<property>
+ <name>mapreduce.job.map.output.collector.class</name>
+ <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+ <description>
+ It defines the MapOutputCollector implementation to use.
+ </description>
+</property>
+
</configuration>
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
new file mode 100644
index 0000000..46a514f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+
+import org.apache.hadoop.mapred.Task.TaskReporter;
+
+import junit.framework.TestCase;
+
+@SuppressWarnings(value={"unchecked", "deprecation"})
+/**
+ * This test tests the support for a merge operation in Hadoop. The input files
+ * are already sorted on the key. This test implements an external
+ * MapOutputCollector implementation that just copies the records to different
+ * partitions while maintaining the sort order in each partition. The Hadoop
+ * framework's merge on the reduce side will merge the partitions created to
+ * generate the final output which is sorted on the key.
+ */
+public class TestMerge extends TestCase {
+ private static final int NUM_HADOOP_DATA_NODES = 2;
+ // Number of input files is same as the number of mappers.
+ private static final int NUM_MAPPERS = 10;
+ // Number of reducers.
+ private static final int NUM_REDUCERS = 4;
+ // Number of lines per input file.
+ private static final int NUM_LINES = 1000;
+ // Where MR job's input will reside.
+ private static final Path INPUT_DIR = new Path("/testplugin/input");
+ // Where output goes.
+ private static final Path OUTPUT = new Path("/testplugin/output");
+
+ public void testMerge() throws Exception {
+ MiniDFSCluster dfsCluster = null;
+ MiniMRClientCluster mrCluster = null;
+ FileSystem fileSystem = null;
+ try {
+ Configuration conf = new Configuration();
+ // Start the mini-MR and mini-DFS clusters
+ dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_DATA_NODES, true, null);
+ fileSystem = dfsCluster.getFileSystem();
+ mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
+ NUM_HADOOP_DATA_NODES, conf);
+ // Generate input.
+ createInput(fileSystem);
+ // Run the test.
+ runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem);
+ } finally {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ if (mrCluster != null) {
+ mrCluster.stop();
+ }
+ }
+ }
+
+ private void createInput(FileSystem fs) throws Exception {
+ fs.delete(INPUT_DIR, true);
+ for (int i = 0; i < NUM_MAPPERS; i++) {
+ OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
+ Writer writer = new OutputStreamWriter(os);
+ for (int j = 0; j < NUM_LINES; j++) {
+ // Create sorted key, value pairs.
+ int k = j + 1;
+ String formattedNumber = String.format("%09d", k);
+ writer.write(formattedNumber + " " + formattedNumber + "\n");
+ }
+ writer.close();
+ }
+ }
+
+ private void runMergeTest(JobConf job, FileSystem fileSystem)
+ throws Exception {
+ // Delete any existing output.
+ fileSystem.delete(OUTPUT, true);
+ job.setJobName("MergeTest");
+ JobClient client = new JobClient(job);
+ RunningJob submittedJob = null;
+ FileInputFormat.setInputPaths(job, INPUT_DIR);
+ FileOutputFormat.setOutputPath(job, OUTPUT);
+ job.set("mapreduce.output.textoutputformat.separator", " ");
+ job.setInputFormat(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setMapperClass(MyMapper.class);
+ job.setPartitionerClass(MyPartitioner.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setNumReduceTasks(NUM_REDUCERS);
+ job.set(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
+ MapOutputCopier.class.getName());
+ try {
+ submittedJob = client.submitJob(job);
+ try {
+ if (! client.monitorAndPrintJob(job, submittedJob)) {
+ throw new IOException("Job failed!");
+ }
+ } catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ } catch(IOException ioe) {
+ System.err.println("Job failed with: " + ioe);
+ } finally {
+ verifyOutput(submittedJob, fileSystem);
+ }
+ }
+
+ private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem)
+ throws Exception {
+ FSDataInputStream dis = null;
+ long numValidRecords = 0;
+ long numInvalidRecords = 0;
+ long numMappersLaunched = NUM_MAPPERS;
+ String prevKeyValue = "000000000";
+ Path[] fileList =
+ FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
+ for (Path outFile : fileList) {
+ try {
+ dis = fileSystem.open(outFile);
+ String record;
+ while((record = dis.readLine()) != null) {
+ // Split the line into key and value.
+ int blankPos = record.indexOf(" ");
+ String keyString = record.substring(0, blankPos);
+ String valueString = record.substring(blankPos+1);
+ // Check for sorted output and correctness of record.
+ if (keyString.compareTo(prevKeyValue) >= 0
+ && keyString.equals(valueString)) {
+ prevKeyValue = keyString;
+ numValidRecords++;
+ } else {
+ numInvalidRecords++;
+ }
+ }
+ } finally {
+ if (dis != null) {
+ dis.close();
+ dis = null;
+ }
+ }
+ }
+ // Make sure we got all input records in the output in sorted order.
+ assertEquals((long)(NUM_MAPPERS*NUM_LINES), numValidRecords);
+ // Make sure there is no extraneous invalid record.
+ assertEquals(0, numInvalidRecords);
+ }
+
+ /**
+ * A mapper implementation that assumes that key text contains valid integers
+ * in displayable form.
+ */
+ public static class MyMapper extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, Text> {
+ private Text keyText;
+ private Text valueText;
+
+ public MyMapper() {
+ keyText = new Text();
+ valueText = new Text();
+ }
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ String record = value.toString();
+ int blankPos = record.indexOf(" ");
+ keyText.set(record.substring(0, blankPos));
+ valueText.set(record.substring(blankPos+1));
+ output.collect(keyText, valueText);
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+ /**
+ * Partitioner implementation to make sure that output is in total sorted
+ * order. We basically route key ranges to different reducers such that
+ * key values monotonically increase with the partition number. For example,
+ * in this test, the keys are numbers from 1 to 1000 in the form "000000001"
+ * to "000001000" in each input file. The keys "000000001" to "000000250" are
+ * routed to partition 0, "000000251" to "000000500" are routed to partition 1
+ * and so on since we have 4 reducers.
+ */
+ static class MyPartitioner implements Partitioner<Text, Text> {
+ public MyPartitioner() {
+ }
+
+ public void configure(JobConf job) {
+ }
+
+ public int getPartition(Text key, Text value, int numPartitions) {
+ int keyValue = 0;
+ try {
+ keyValue = Integer.parseInt(key.toString());
+ } catch(NumberFormatException nfe) {
+ keyValue = 0;
+ }
+ int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/NUM_LINES;
+ return partitionNumber;
+ }
+ }
+
+ /**
+ * Implementation of map output copier(that avoids sorting) on the map side.
+ * It maintains keys in the input order within each partition created for
+ * reducers.
+ */
+ static class MapOutputCopier<K, V>
+ implements MapOutputCollector<K, V> {
+ private static final int BUF_SIZE = 128*1024;
+ private MapTask mapTask;
+ private JobConf jobConf;
+ private TaskReporter reporter;
+ private int numberOfPartitions;
+ private Class<K> keyClass;
+ private Class<V> valueClass;
+ private KeyValueWriter<K, V> recordWriters[];
+ private ByteArrayOutputStream outStreams[];
+
+ public MapOutputCopier() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init(MapOutputCollector.Context context)
+ throws IOException, ClassNotFoundException {
+ this.mapTask = context.getMapTask();
+ this.jobConf = context.getJobConf();
+ this.reporter = context.getReporter();
+ numberOfPartitions = jobConf.getNumReduceTasks();
+ keyClass = (Class<K>)jobConf.getMapOutputKeyClass();
+ valueClass = (Class<V>)jobConf.getMapOutputValueClass();
+ recordWriters = new KeyValueWriter[numberOfPartitions];
+ outStreams = new ByteArrayOutputStream[numberOfPartitions];
+
+ // Create output streams for partitions.
+ for (int i = 0; i < numberOfPartitions; i++) {
+ outStreams[i] = new ByteArrayOutputStream();
+ recordWriters[i] = new KeyValueWriter<K, V>(jobConf, outStreams[i],
+ keyClass, valueClass);
+ }
+ }
+
+ public synchronized void collect(K key, V value, int partitionNumber
+ ) throws IOException, InterruptedException {
+ if (partitionNumber >= 0 && partitionNumber < numberOfPartitions) {
+ recordWriters[partitionNumber].write(key, value);
+ } else {
+ throw new IOException("Invalid partition number: " + partitionNumber);
+ }
+ reporter.progress();
+ }
+
+ public void close() throws IOException, InterruptedException {
+ long totalSize = 0;
+ for (int i = 0; i < numberOfPartitions; i++) {
+ recordWriters[i].close();
+ outStreams[i].close();
+ totalSize += outStreams[i].size();
+ }
+ MapOutputFile mapOutputFile = mapTask.getMapOutputFile();
+ Path finalOutput = mapOutputFile.getOutputFileForWrite(totalSize);
+ Path indexPath = mapOutputFile.getOutputIndexFileForWrite(
+ numberOfPartitions*mapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ // Copy partitions to final map output.
+ copyPartitions(finalOutput, indexPath);
+ }
+
+ public void flush() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ }
+
+ private void copyPartitions(Path mapOutputPath, Path indexPath)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(jobConf);
+ FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
+ FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, BUF_SIZE);
+ SpillRecord spillRecord = new SpillRecord(numberOfPartitions);
+ IndexRecord indexRecord = new IndexRecord();
+ for (int i = 0; i < numberOfPartitions; i++) {
+ indexRecord.startOffset = rawOutput.getPos();
+ byte buffer[] = outStreams[i].toByteArray();
+ IFileOutputStream checksumOutput = new IFileOutputStream(rawOutput);
+ checksumOutput.write(buffer);
+ // Write checksum.
+ checksumOutput.finish();
+ // Write index record
+ indexRecord.rawLength = (long)buffer.length;
+ indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset;
+ spillRecord.putIndex(indexRecord, i);
+ reporter.progress();
+ }
+ rawOutput.close();
+ spillRecord.writeToFile(indexPath, jobConf);
+ }
+ }
+
+ static class KeyValueWriter<K, V> {
+ private Class<K> keyClass;
+ private Class<V> valueClass;
+ private DataOutputBuffer dataBuffer;
+ private Serializer<K> keySerializer;
+ private Serializer<V> valueSerializer;
+ private DataOutputStream outputStream;
+
+ public KeyValueWriter(Configuration conf, OutputStream output,
+ Class<K> kyClass, Class<V> valClass
+ ) throws IOException {
+ keyClass = kyClass;
+ valueClass = valClass;
+ dataBuffer = new DataOutputBuffer();
+ SerializationFactory serializationFactory
+ = new SerializationFactory(conf);
+ keySerializer
+ = (Serializer<K>)serializationFactory.getSerializer(keyClass);
+ keySerializer.open(dataBuffer);
+ valueSerializer
+ = (Serializer<V>)serializationFactory.getSerializer(valueClass);
+ valueSerializer.open(dataBuffer);
+ outputStream = new DataOutputStream(output);
+ }
+
+ public void write(K key, V value) throws IOException {
+ if (key.getClass() != keyClass) {
+ throw new IOException("wrong key class: "+ key.getClass()
+ +" is not "+ keyClass);
+ }
+ if (value.getClass() != valueClass) {
+ throw new IOException("wrong value class: "+ value.getClass()
+ +" is not "+ valueClass);
+ }
+ // Append the 'key'
+ keySerializer.serialize(key);
+ int keyLength = dataBuffer.getLength();
+ if (keyLength < 0) {
+ throw new IOException("Negative key-length not allowed: " + keyLength +
+ " for " + key);
+ }
+ // Append the 'value'
+ valueSerializer.serialize(value);
+ int valueLength = dataBuffer.getLength() - keyLength;
+ if (valueLength < 0) {
+ throw new IOException("Negative value-length not allowed: " +
+ valueLength + " for " + value);
+ }
+ // Write the record out
+ WritableUtils.writeVInt(outputStream, keyLength);
+ WritableUtils.writeVInt(outputStream, valueLength);
+ outputStream.write(dataBuffer.getData(), 0, dataBuffer.getLength());
+ // Reset
+ dataBuffer.reset();
+ }
+
+ public void close() throws IOException {
+ keySerializer.close();
+ valueSerializer.close();
+ WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER);
+ WritableUtils.writeVInt(outputStream, IFile.EOF_MARKER);
+ outputStream.close();
+ }
+ }
+}