Fix collissions for shuffle input

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/branches/yahoo-merge@1082631 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 3902323..285f7e9 100644
--- a/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -225,7 +225,7 @@
               getShortUserName());
           TaskRunner.setupChildMapredLocalDirs(map, localConf);
 
-          MapOutputFile mapOutput = new MapOutputFile();
+          MapOutputFile mapOutput = new MROutputFiles();
           mapOutput.setConf(localConf);
           mapOutputFiles.put(mapId, mapOutput);
 
@@ -389,7 +389,7 @@
               if (!this.isInterrupted()) {
                 TaskAttemptID mapId = mapIds.get(i);
                 Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
-                MapOutputFile localOutputFile = new MapOutputFile();
+                MapOutputFile localOutputFile = new MROutputFiles();
                 localOutputFile.setConf(localConf);
                 Path reduceIn =
                   localOutputFile.getInputFileForWrite(mapId.getTaskID(),
diff --git a/src/java/org/apache/hadoop/mapred/MapOutputFile.java b/src/java/org/apache/hadoop/mapred/MapOutputFile.java
index c40df2a..153abfc 100644
--- a/src/java/org/apache/hadoop/mapred/MapOutputFile.java
+++ b/src/java/org/apache/hadoop/mapred/MapOutputFile.java
@@ -39,9 +39,9 @@
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MapOutputFile implements Configurable {
+public abstract class MapOutputFile implements Configurable {
 
-  private JobConf conf;
+  private Configuration conf;
 
   static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
   static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
@@ -50,177 +50,121 @@
   public MapOutputFile() {
   }
 
-  private LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(MRConfig.LOCAL_DIR);
-  
   /**
    * Return the path to local map output file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
-  public Path getOutputFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING, conf);
-  }
+  public abstract Path getOutputFile() throws IOException;
 
   /**
    * Create a local map output file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getOutputFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING, size, conf);
-  }
+  public abstract Path getOutputFileForWrite(long size) throws IOException;
 
   /**
    * Create a local map output file name on the same volume.
    */
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(), MAP_OUTPUT_FILENAME_STRING);
-  }
+  public abstract Path getOutputFileForWriteInVolume(Path existing);
 
   /**
    * Return the path to a local map output index file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
-  public Path getOutputIndexFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, conf);
-  }
+  public abstract Path getOutputIndexFile() throws IOException;
 
   /**
    * Create a local map output index file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getOutputIndexFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
-        size, conf);
-  }
+  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
 
   /**
    * Create a local map output index file name on the same volume.
    */
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(),
-        MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING);
-  }
+  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
 
   /**
    * Return a local map spill file created earlier.
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
    */
-  public Path getSpillFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out", conf);
-  }
+  public abstract Path getSpillFile(int spillNumber) throws IOException;
 
   /**
    * Create a local map spill file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out", size, conf);
-  }
+  public abstract Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException;
 
   /**
    * Return a local map spill index file created earlier
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
    */
-  public Path getSpillIndexFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out.index", conf);
-  }
+  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
 
   /**
    * Create a local map spill index file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out.index", size, conf);
-  }
+  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException;
 
   /**
    * Return a local reduce input file created earlier
-   * 
+   *
    * @param mapId a map task id
    * @return path
-   * @throws IOException 
+   * @throws IOException
    */
-  public Path getInputFile(int mapId)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
-            .valueOf(mapId)), conf);
-  }
+  public abstract Path getInputFile(int mapId) throws IOException;
 
   /**
    * Create a local reduce input file name.
-   * 
+   *
    * @param mapId a map task id
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, 
-                                   long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
-        size, conf);
-  }
+  public abstract Path getInputFileForWrite(
+      org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException;
 
   /** Removes all of the files related to a task. */
-  public void removeAll()
-      throws IOException {
-    conf.deleteLocalFiles(TaskTracker.OUTPUT);
-  }
+  public abstract void removeAll() throws IOException;
 
   @Override
   public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
+    this.conf = conf;
   }
 
   @Override
   public Configuration getConf() {
     return conf;
   }
-  
+
 }
diff --git a/src/java/org/apache/hadoop/mapred/ReduceTask.java b/src/java/org/apache/hadoop/mapred/ReduceTask.java
index 93fab5a..d9fb718 100644
--- a/src/java/org/apache/hadoop/mapred/ReduceTask.java
+++ b/src/java/org/apache/hadoop/mapred/ReduceTask.java
@@ -407,7 +407,8 @@
                     shuffledMapsCounter,
                     reduceShuffleBytes, failedShuffleCounter,
                     mergedMapOutputsCounter,
-                    taskStatus, copyPhase, sortPhase, this);
+                    taskStatus, copyPhase, sortPhase, this,
+                    mapOutputFile);
       rIter = shuffle.run();
     } else {
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();
diff --git a/src/java/org/apache/hadoop/mapred/Task.java b/src/java/org/apache/hadoop/mapred/Task.java
index 57fcc3f..ce8cb49 100644
--- a/src/java/org/apache/hadoop/mapred/Task.java
+++ b/src/java/org/apache/hadoop/mapred/Task.java
@@ -1187,7 +1187,7 @@
     }
     this.mapOutputFile = ReflectionUtils.newInstance(
         conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
-          MapOutputFile.class, MapOutputFile.class), conf);
+          MROutputFiles.class, MapOutputFile.class), conf);
     this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
     // add the static resolutions (this is required for the junit to
     // work on testcases that simulate multiple nodes on a single physical
diff --git a/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java b/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
index c0a8207..83b4d65 100644
--- a/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
+++ b/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
@@ -133,7 +133,7 @@
                       Counters.Counter reduceCombineInputCounter,
                       Counters.Counter mergedMapOutputsCounter,
                       ExceptionReporter exceptionReporter,
-                      Progress mergePhase) {
+                      Progress mergePhase, MapOutputFile mapOutputFile) {
     this.reduceId = reduceId;
     this.jobConf = jobConf;
     this.localDirAllocator = localDirAllocator;
@@ -146,7 +146,7 @@
     this.reduceCombineInputCounter = reduceCombineInputCounter;
     this.spilledRecordsCounter = spilledRecordsCounter;
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile = mapOutputFile;
     this.mapOutputFile.setConf(jobConf);
     
     this.localFS = localFS;
diff --git a/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index 177a248..4b8b854 100644
--- a/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
@@ -75,7 +76,8 @@
                  TaskStatus status,
                  Progress copyPhase,
                  Progress mergePhase,
-                 Task reduceTask) {
+                 Task reduceTask,
+                 MapOutputFile mapOutputFile) {
     this.reduceId = reduceId;
     this.jobConf = jobConf;
     this.umbilical = umbilical;
@@ -95,7 +97,7 @@
                                     spilledRecordsCounter, 
                                     reduceCombineInputCounter, 
                                     mergedMapOutputsCounter, 
-                                    this, mergePhase);
+                                    this, mergePhase, mapOutputFile);
   }
 
   @SuppressWarnings("unchecked")
diff --git a/src/test/mapred-site.xml b/src/test/mapred-site.xml
index 6773c40..4874e61 100644
--- a/src/test/mapred-site.xml
+++ b/src/test/mapred-site.xml
@@ -48,4 +48,8 @@
   <name>mapreduce.jobtracker.persist.jobstatus.active</name>
   <value>false</value>
 </property>
+<property>
+  <name>mapreduce.task.local.output.class</name>
+  <value>org.apache.hadoop.mapred.MROutputFiles</value>
+</property>
 </configuration>
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java b/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
index 8b004a6..b4d4c90 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
@@ -306,7 +306,7 @@
                        ) throws IOException {
       if (first) {
         first = false;
-        MapOutputFile mapOutputFile = new MapOutputFile();
+        MapOutputFile mapOutputFile = new MROutputFiles();
         mapOutputFile.setConf(conf);
         Path input = mapOutputFile.getInputFile(0);
         FileSystem fs = FileSystem.get(conf);