Merging trunk to branch-trunk-win
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-trunk-win@1453456 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index dfc42e0..d170aed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -349,6 +349,9 @@
HDFS-4540. Namenode http server should use the web authentication
keytab for spnego principal. (Arpit Gupta via suresh)
+ HDFS-4544. Error in deleting blocks should not do check disk, for
+ all types of errors. (Arpit Agarwal via suresh)
+
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 6738241..69fcdd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -538,7 +538,7 @@
// using global fsdataset
dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
- dn.checkDiskError();
+ // Exceptions caught here are not expected to be disk-related.
throw e;
}
dn.metrics.incrBlocksRemoved(toDelete.length);
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 9014022..dc877be 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -201,6 +201,10 @@
MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does
not exist. (sandyr via tucu)
+ MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
+ appropriately used and that on-disk segments are correctly sorted on
+ file-size. (Anty Rao and Ravi Prakash via acmurthy)
+
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
index d007470..ced9040 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
@@ -169,7 +169,7 @@
}
- static <K extends Object, V extends Object>
+ public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index 138ea43..c6f9a36 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -477,7 +477,7 @@
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
- writer.getRawLength());
+ writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
@@ -500,7 +500,7 @@
private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
public OnDiskMerger(MergeManagerImpl<K, V> manager) {
- super(manager, Integer.MAX_VALUE, exceptionReporter);
+ super(manager, ioSortFactor, exceptionReporter);
setName("OnDiskMerger - Thread to merge on-disk map-outputs");
setDaemon(true);
}
@@ -554,7 +554,7 @@
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
- writer.getRawLength());
+ writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
@@ -719,7 +719,7 @@
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
onDiskMapOutputs.add(new CompressAwarePath(outputPath,
- writer.getRawLength()));
+ writer.getRawLength(), writer.getCompressedLength()));
writer = null;
// add to list of final disk outputs.
} catch (IOException e) {
@@ -791,7 +791,7 @@
// merges. See comment where mergePhaseFinished is being set
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
RawKeyValueIterator diskMerge = Merger.merge(
- job, fs, keyClass, valueClass, diskSegments,
+ job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear();
@@ -810,24 +810,45 @@
static class CompressAwarePath extends Path {
private long rawDataLength;
+ private long compressedSize;
- public CompressAwarePath(Path path, long rawDataLength) {
+ public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
super(path.toUri());
this.rawDataLength = rawDataLength;
+ this.compressedSize = compressSize;
}
public long getRawDataLength() {
return rawDataLength;
}
-
+
+ public long getCompressedSize() {
+ return compressedSize;
+ }
+
@Override
public boolean equals(Object other) {
return super.equals(other);
}
-
+
@Override
public int hashCode() {
return super.hashCode();
}
+
+ @Override
+ public int compareTo(Object obj) {
+ if(obj instanceof CompressAwarePath) {
+ CompressAwarePath compPath = (CompressAwarePath) obj;
+ if(this.compressedSize < compPath.getCompressedSize()) {
+ return -1;
+ } else if (this.getCompressedSize() > compPath.getCompressedSize()) {
+ return 1;
+ }
+ // Not returning 0 here so that objects with the same size (but
+ // different paths) are still added to the TreeSet.
+ }
+ return super.compareTo(obj);
+ }
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index bf69798..68713d3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -48,6 +48,7 @@
private final Path outputPath;
private final MergeManagerImpl<K, V> merger;
private final OutputStream disk;
+ private long compressedSize;
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K, V> merger, long size,
@@ -108,13 +109,14 @@
bytesLeft + " bytes missing of " +
compressedLength + ")");
}
+ this.compressedSize = compressedLength;
}
@Override
public void commit() throws IOException {
localFS.rename(tmpOutputPath, outputPath);
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
- getSize());
+ getSize(), this.compressedSize);
merger.closeOnDiskFile(compressAwarePath);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
index 46d797c..8d6bab9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
@@ -17,28 +17,38 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
public class TestMergeManager {
@Test(timeout=10000)
- @SuppressWarnings("unchecked")
public void testMemoryMerge() throws Exception {
final int TOTAL_MEM_BYTES = 10000;
final int OUTPUT_SIZE = 7950;
@@ -195,4 +205,59 @@
return exceptions.size();
}
}
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ @Test(timeout=10000)
+ public void testOnDiskMerger() throws IOException, URISyntaxException,
+ InterruptedException {
+ JobConf jobConf = new JobConf();
+ final int SORT_FACTOR = 5;
+ jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
+
+ MapOutputFile mapOutputFile = new MROutputFiles();
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ MergeManagerImpl<IntWritable, IntWritable> manager =
+ new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
+ , null, null, null, null, null, null, null, null, null, mapOutputFile);
+
+ MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
+ onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
+ IntWritable, IntWritable>) Whitebox.getInternalState(manager,
+ "onDiskMerger");
+ int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
+ "mergeFactor");
+
+ // make sure the io.sort.factor is set properly
+ assertEquals(mergeFactor, SORT_FACTOR);
+
+ // Stop the onDiskMerger thread so that we can intercept the list of files
+ // waiting to be merged.
+ onDiskMerger.suspend();
+
+ //Send the list of fake files waiting to be merged
+ Random rand = new Random();
+ for(int i = 0; i < 2*SORT_FACTOR; ++i) {
+ Path path = new Path("somePath");
+ CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
+ manager.closeOnDiskFile(cap);
+ }
+
+ //Check that the files pending to be merged are in sorted order.
+ LinkedList<List<CompressAwarePath>> pendingToBeMerged =
+ (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
+ onDiskMerger, "pendingToBeMerged");
+ assertTrue("No inputs were added to list pending to merge",
+ pendingToBeMerged.size() > 0);
+ for(int i = 0; i < pendingToBeMerged.size(); ++i) {
+ List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
+ for(int j = 1; j < inputs.size(); ++j) {
+ assertTrue("Not enough / too many inputs were going to be merged",
+ inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
+ assertTrue("Inputs to be merged were not sorted according to size: ",
+ inputs.get(j).getCompressedSize()
+ >= inputs.get(j-1).getCompressedSize());
+ }
+ }
+
+ }
}