Fix collissions for shuffle input
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/yahoo-merge@1082631 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 3902323..285f7e9 100644
--- a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -225,7 +225,7 @@
getShortUserName());
TaskRunner.setupChildMapredLocalDirs(map, localConf);
- MapOutputFile mapOutput = new MapOutputFile();
+ MapOutputFile mapOutput = new MROutputFiles();
mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput);
@@ -389,7 +389,7 @@
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
- MapOutputFile localOutputFile = new MapOutputFile();
+ MapOutputFile localOutputFile = new MROutputFiles();
localOutputFile.setConf(localConf);
Path reduceIn =
localOutputFile.getInputFileForWrite(mapId.getTaskID(),
diff --git a/src/java/org/apache/hadoop/mapred/MapOutputFile.java b/src/java/org/apache/hadoop/mapred/MapOutputFile.java
index c40df2a..153abfc 100644
--- a/src/java/org/apache/hadoop/mapred/MapOutputFile.java
+++ b/src/java/org/apache/hadoop/mapred/MapOutputFile.java
@@ -39,9 +39,9 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class MapOutputFile implements Configurable {
+public abstract class MapOutputFile implements Configurable {
- private JobConf conf;
+ private Configuration conf;
static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
@@ -50,177 +50,121 @@
public MapOutputFile() {
}
- private LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(MRConfig.LOCAL_DIR);
-
/**
* Return the path to local map output file created earlier
- *
+ *
* @return path
* @throws IOException
*/
- public Path getOutputFile()
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
- + MAP_OUTPUT_FILENAME_STRING, conf);
- }
+ public abstract Path getOutputFile() throws IOException;
/**
* Create a local map output file name.
- *
+ *
* @param size the size of the file
* @return path
* @throws IOException
*/
- public Path getOutputFileForWrite(long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
- + MAP_OUTPUT_FILENAME_STRING, size, conf);
- }
+ public abstract Path getOutputFileForWrite(long size) throws IOException;
/**
* Create a local map output file name on the same volume.
*/
- public Path getOutputFileForWriteInVolume(Path existing) {
- return new Path(existing.getParent(), MAP_OUTPUT_FILENAME_STRING);
- }
+ public abstract Path getOutputFileForWriteInVolume(Path existing);
/**
* Return the path to a local map output index file created earlier
- *
+ *
* @return path
* @throws IOException
*/
- public Path getOutputIndexFile()
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
- + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, conf);
- }
+ public abstract Path getOutputIndexFile() throws IOException;
/**
* Create a local map output index file name.
- *
+ *
* @param size the size of the file
* @return path
* @throws IOException
*/
- public Path getOutputIndexFileForWrite(long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
- + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
- size, conf);
- }
+ public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
/**
* Create a local map output index file name on the same volume.
*/
- public Path getOutputIndexFileForWriteInVolume(Path existing) {
- return new Path(existing.getParent(),
- MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING);
- }
+ public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
/**
* Return a local map spill file created earlier.
- *
+ *
* @param spillNumber the number
* @return path
* @throws IOException
*/
- public Path getSpillFile(int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
- + spillNumber + ".out", conf);
- }
+ public abstract Path getSpillFile(int spillNumber) throws IOException;
/**
* Create a local map spill file name.
- *
+ *
* @param spillNumber the number
* @param size the size of the file
* @return path
* @throws IOException
*/
- public Path getSpillFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
- + spillNumber + ".out", size, conf);
- }
+ public abstract Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException;
/**
* Return a local map spill index file created earlier
- *
+ *
* @param spillNumber the number
* @return path
* @throws IOException
*/
- public Path getSpillIndexFile(int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
- + spillNumber + ".out.index", conf);
- }
+ public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
/**
* Create a local map spill index file name.
- *
+ *
* @param spillNumber the number
* @param size the size of the file
* @return path
* @throws IOException
*/
- public Path getSpillIndexFileForWrite(int spillNumber, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
- + spillNumber + ".out.index", size, conf);
- }
+ public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException;
/**
* Return a local reduce input file created earlier
- *
+ *
* @param mapId a map task id
* @return path
- * @throws IOException
+ * @throws IOException
*/
- public Path getInputFile(int mapId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(String.format(
- REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
- .valueOf(mapId)), conf);
- }
+ public abstract Path getInputFile(int mapId) throws IOException;
/**
* Create a local reduce input file name.
- *
+ *
* @param mapId a map task id
* @param size the size of the file
* @return path
* @throws IOException
*/
- public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
- long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(String.format(
- REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
- size, conf);
- }
+ public abstract Path getInputFileForWrite(
+ org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException;
/** Removes all of the files related to a task. */
- public void removeAll()
- throws IOException {
- conf.deleteLocalFiles(TaskTracker.OUTPUT);
- }
+ public abstract void removeAll() throws IOException;
@Override
public void setConf(Configuration conf) {
- if (conf instanceof JobConf) {
- this.conf = (JobConf) conf;
- } else {
- this.conf = new JobConf(conf);
- }
+ this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
-
+
}
diff --git a/src/java/org/apache/hadoop/mapred/ReduceTask.java b/src/java/org/apache/hadoop/mapred/ReduceTask.java
index 93fab5a..d9fb718 100644
--- a/src/java/org/apache/hadoop/mapred/ReduceTask.java
+++ b/src/java/org/apache/hadoop/mapred/ReduceTask.java
@@ -407,7 +407,8 @@
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
- taskStatus, copyPhase, sortPhase, this);
+ taskStatus, copyPhase, sortPhase, this,
+ mapOutputFile);
rIter = shuffle.run();
} else {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
diff --git a/src/java/org/apache/hadoop/mapred/Task.java b/src/java/org/apache/hadoop/mapred/Task.java
index 57fcc3f..ce8cb49 100644
--- a/src/java/org/apache/hadoop/mapred/Task.java
+++ b/src/java/org/apache/hadoop/mapred/Task.java
@@ -1187,7 +1187,7 @@
}
this.mapOutputFile = ReflectionUtils.newInstance(
conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
- MapOutputFile.class, MapOutputFile.class), conf);
+ MROutputFiles.class, MapOutputFile.class), conf);
this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
// add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical
diff --git a/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
index c0a8207..83b4d65 100644
--- a/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
+++ b/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
@@ -133,7 +133,7 @@
Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
- Progress mergePhase) {
+ Progress mergePhase, MapOutputFile mapOutputFile) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localDirAllocator = localDirAllocator;
@@ -146,7 +146,7 @@
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
- this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile = mapOutputFile;
this.mapOutputFile.setConf(jobConf);
this.localFS = localFS;
diff --git a/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index 177a248..4b8b854 100644
--- a/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
@@ -75,7 +76,8 @@
TaskStatus status,
Progress copyPhase,
Progress mergePhase,
- Task reduceTask) {
+ Task reduceTask,
+ MapOutputFile mapOutputFile) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.umbilical = umbilical;
@@ -95,7 +97,7 @@
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
- this, mergePhase);
+ this, mergePhase, mapOutputFile);
}
@SuppressWarnings("unchecked")
diff --git a/src/test/mapred-site.xml b/src/test/mapred-site.xml
index 6773c40..4874e61 100644
--- a/src/test/mapred-site.xml
+++ b/src/test/mapred-site.xml
@@ -48,4 +48,8 @@
<name>mapreduce.jobtracker.persist.jobstatus.active</name>
<value>false</value>
</property>
+<property>
+ <name>mapreduce.task.local.output.class</name>
+ <value>org.apache.hadoop.mapred.MROutputFiles</value>
+</property>
</configuration>
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java b/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
index 8b004a6..b4d4c90 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
@@ -306,7 +306,7 @@
) throws IOException {
if (first) {
first = false;
- MapOutputFile mapOutputFile = new MapOutputFile();
+ MapOutputFile mapOutputFile = new MROutputFiles();
mapOutputFile.setConf(conf);
Path input = mapOutputFile.getInputFile(0);
FileSystem fs = FileSystem.get(conf);