MAPREDUCE-2327. MapTask doesn't need to put username information in SpillRecord. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1103990 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index a3c7d10..b52704a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -736,6 +736,9 @@
MAPREDUCE-2486. Incorrect snapshot dependency published in .pom files
(todd)
+ MAPREDUCE-2327. MapTask doesn't need to put username information in
+ SpillRecord. (todd via tomwhite)
+
Release 0.21.1 - Unreleased
NEW FEATURES
diff --git a/src/java/org/apache/hadoop/mapred/MapTask.java b/src/java/org/apache/hadoop/mapred/MapTask.java
index 74b9429..15f9404 100644
--- a/src/java/org/apache/hadoop/mapred/MapTask.java
+++ b/src/java/org/apache/hadoop/mapred/MapTask.java
@@ -768,7 +768,8 @@
final ArrayList<SpillRecord> indexCacheList =
new ArrayList<SpillRecord>();
private int totalIndexCacheMemory;
- private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
+ private int indexCacheMemoryLimit;
+ private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
@@ -783,6 +784,8 @@
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
+ indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
+ INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
@@ -1466,7 +1469,7 @@
}
}
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
@@ -1531,7 +1534,7 @@
throw e;
}
}
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
@@ -1647,8 +1650,7 @@
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
- indexCacheList.add(new SpillRecord(indexFileName, job,
- UserGroupInformation.getCurrentUser().getShortUserName()));
+ indexCacheList.add(new SpillRecord(indexFileName, job));
}
//make correction in the length to include the sequence file header
diff --git a/src/java/org/apache/hadoop/mapred/SpillRecord.java b/src/java/org/apache/hadoop/mapred/SpillRecord.java
index 969c56a..bb91e4f 100644
--- a/src/java/org/apache/hadoop/mapred/SpillRecord.java
+++ b/src/java/org/apache/hadoop/mapred/SpillRecord.java
@@ -50,6 +50,10 @@
entries = buf.asLongBuffer();
}
+ public SpillRecord(Path indexFileName, JobConf job) throws IOException {
+ this(indexFileName, job, null);
+ }
+
public SpillRecord(Path indexFileName, JobConf job, String expectedIndexOwner)
throws IOException {
this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
diff --git a/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index a3ec081..bcaeaf1 100644
--- a/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -128,6 +128,8 @@
public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+ public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
+
public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";
public static final String PRESERVE_FILES_PATTERN = "mapreduce.task.files.preserve.filepattern";
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java b/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
index a57e353..e1c8daa 100644
--- a/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
@@ -120,4 +120,27 @@
runJobAsUser(job2, BOB_UGI);
}
+ /**
+ * Regression test for MAPREDUCE-2327. Verifies that, even if a map
+ * task makes lots of spills (more than fit in the spill index cache)
+ * that it will succeed.
+ */
+ public void testMultipleSpills() throws Exception {
+ JobConf job1 = mr.createJobConf();
+
+ // Make sure it spills twice
+ job1.setFloat(MRJobConfig.MAP_SORT_SPILL_PERCENT, 0.0001f);
+ job1.setInt(MRJobConfig.IO_SORT_MB, 1);
+
+ // Make sure the spill records don't fit in index cache
+ job1.setInt(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, 0);
+
+ String input = "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n";
+ Path inDir = new Path("/testing/distinct/input");
+ Path outDir = new Path("/user/alice/output");
+ TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
+ input, 2, 1, inDir, outDir);
+ runJobAsUser(job1, ALICE_UGI);
+ }
}