MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
correctly. (Ramkumar Vadali via schen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1021524 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8666fef..5cfdf74 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -325,6 +325,9 @@
MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
Mathew via amareshwari)
+ MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
+ correctly. (Ramkumar Vadali via schen)
+
Release 0.21.1 - Unreleased
NEW FEATURES
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
index 89f404b..49defb0 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
@@ -334,14 +334,18 @@
while (alternates != null && nextLocation < alternates.length) {
try {
int idx = nextLocation++;
- long corruptOffset = -1;
- if (curexp instanceof BlockMissingException) {
- corruptOffset = ((BlockMissingException)curexp).getOffset();
- } else if (curexp instanceof ChecksumException) {
- corruptOffset = ((ChecksumException)curexp).getPos();
- }
- Path npath = RaidNode.unRaid(conf, path, alternates[idx], stripeLength,
- corruptOffset);
+ long corruptOffset = underLyingStream.getPos();
+
+ // Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
+ Configuration clientConf = new Configuration(conf);
+ Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
+ DistributedFileSystem.class);
+ clientConf.set("fs.hdfs.impl", clazz.getName());
+ // Disable caching so that a previously cached RaidDfs is not used.
+ clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ Path npath = RaidNode.unRaid(clientConf, path,
+ alternates[idx], stripeLength,
+ corruptOffset);
FileSystem fs1 = getUnderlyingFileSystem(conf);
fs1.initialize(npath.toUri(), conf);
LOG.info("Opening alternate path " + npath + " at offset " + curpos);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index c788a1e..f6e3aa6 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.RandomAccessFile;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@@ -187,11 +188,18 @@
assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem);
- // corrupt first block of file
LOG.info("Corrupt first block of file");
- corruptBlock(file1, locations.get(0).getBlock());
+ corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false);
validateFile(raidfs, file1, file1, crc1);
+ // Corrupt one more block. This is expected to fail.
+ LOG.info("Corrupt second block of file");
+ corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false);
+ try {
+ validateFile(raidfs, file1, file1, crc1);
+ fail("Expected exception ChecksumException not thrown!");
+ } catch (org.apache.hadoop.fs.ChecksumException e) {
+ }
} catch (Exception e) {
LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
throw e;
@@ -288,43 +296,67 @@
/*
* The Data directories for a datanode
*/
- private File[] getDataNodeDirs(int i) throws IOException {
+ static private File[] getDataNodeDirs(int i) throws IOException {
File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
File data_dir = new File(base_dir, "data");
File dir1 = new File(data_dir, "data"+(2*i+1));
File dir2 = new File(data_dir, "data"+(2*i+2));
if (dir1.isDirectory() && dir2.isDirectory()) {
File[] dir = new File[2];
- dir[0] = new File(dir1, "current");
- dir[1] = new File(dir2, "current");
+ dir[0] = new File(dir1, "current/finalized");
+ dir[1] = new File(dir2, "current/finalized");
return dir;
}
return new File[0];
}
//
- // Corrupt specified block of file
+ // Delete/Corrupt specified block of file
//
- void corruptBlock(Path file, Block blockNum) throws IOException {
+ public static void corruptBlock(Path file, Block blockNum,
+ int numDataNodes, boolean delete) throws IOException {
long id = blockNum.getBlockId();
// Now deliberately remove/truncate data blocks from the block.
- //
- for (int i = 0; i < NUM_DATANODES; i++) {
+ int numDeleted = 0;
+ int numCorrupted = 0;
+ for (int i = 0; i < numDataNodes; i++) {
File[] dirs = getDataNodeDirs(i);
-
+
for (int j = 0; j < dirs.length; j++) {
File[] blocks = dirs[j].listFiles();
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
for (int idx = 0; idx < blocks.length; idx++) {
if (blocks[idx].getName().startsWith("blk_" + id) &&
!blocks[idx].getName().endsWith(".meta")) {
- blocks[idx].delete();
- LOG.info("Deleted block " + blocks[idx]);
+ if (delete) {
+ blocks[idx].delete();
+ LOG.info("Deleted block " + blocks[idx]);
+ numDeleted++;
+ } else {
+ // Corrupt
+ File f = blocks[idx];
+ long seekPos = f.length()/2;
+ RandomAccessFile raf = new RandomAccessFile(f, "rw");
+ raf.seek(seekPos);
+ int data = raf.readInt();
+ raf.seek(seekPos);
+ raf.writeInt(data+1);
+ LOG.info("Corrupted block " + blocks[idx]);
+ numCorrupted++;
+ }
}
}
}
}
+ assertTrue("Nothing corrupted or deleted",
+ (numCorrupted + numDeleted) > 0);
}
+ //
+ // Corrupt specified block of file
+ //
+ void corruptBlock(Path file, Block blockNum) throws IOException {
+ corruptBlock(file, blockNum, NUM_DATANODES, true);
+ }
}