MAPREDUCE-2467. HDFS-1052 changes break the raid contrib module in MapReduce. (suresh srinivas via mahadev)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1101994 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ed2ead6..89bc420 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -163,14 +163,14 @@
MAPREDUCE-2448. NoSuchMethodError:
org.apache.hadoop.hdfs.TestDatanodeBlockScanner.corruptReplica(..) (eli)
- MAPREDUCE-2465. Disable raid contrib which is unable to compile after
- HDFS federation merge. (todd)
-
MAPREDUCE-2460. Fix flaky test TestFairSchedulerSystem. (todd)
MAPREDUCE-2451. Log the details from health check script at the
JobTracker. (Thomas Graves via cdouglas)
+ MAPREDUCE-2467. HDFS-1052 changes break the raid contrib module in
+ MapReduce. (suresh srinivas via mahadev)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/contrib/build.xml b/src/contrib/build.xml
index 6c2e364..ddae8b9 100644
--- a/src/contrib/build.xml
+++ b/src/contrib/build.xml
@@ -22,21 +22,13 @@
<!-- In case one of the contrib subdirectories -->
<!-- fails the build or test targets and you cannot fix it: -->
<!-- Then add to fileset: excludes="badcontrib/build.xml" -->
- <patternset id="contrib-builds">
- <include name="*/build.xml" />
- <!-- raid compilation disabled due to failure after
- federation merge. See MAPREDUCE-2465. -->
- <exclude name="raid/build.xml" />
- </patternset>
<!-- ====================================================== -->
<!-- Compile contribs. -->
<!-- ====================================================== -->
<target name="compile">
<subant target="compile">
- <fileset dir=".">
- <patternset refid="contrib-builds" />
- </fileset>
+ <fileset dir="." includes="*/build.xml"/>
</subant>
</target>
@@ -45,9 +37,7 @@
<!-- ====================================================== -->
<target name="compile-test">
<subant target="compile-test">
- <fileset dir=".">
- <patternset refid="contrib-builds" />
- </fileset>
+ <fileset dir="." includes="*/build.xml"/>
</subant>
</target>
@@ -56,9 +46,7 @@
<!-- ====================================================== -->
<target name="package">
<subant target="package">
- <fileset dir=".">
- <patternset refid="contrib-builds" />
- </fileset>
+ <fileset dir="." includes="*/build.xml"/>
</subant>
</target>
@@ -80,9 +68,7 @@
<fileset dir="." includes="vertica/build.xml"/>
<!-- mumak tests disabled due to timeouts. See MAPREDUCE-2348
<fileset dir="." includes="mumak/build.xml"/> -->
- <!-- raid tests disabled due to compilation failure after
- federation merge. See MAPREDUCE-2465.
- <fileset dir="." includes="raid/build.xml"/> -->
+ <fileset dir="." includes="raid/build.xml"/>
</subant>
<available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
<fail if="testsfailed">Tests failed!</fail>
@@ -122,9 +108,7 @@
<!-- ====================================================== -->
<target name="clean">
<subant target="clean">
- <fileset dir=".">
- <patternset refid="contrib-builds" />
- </fileset>
+ <fileset dir="." includes="*/build.xml"/>
</subant>
</target>
diff --git a/src/contrib/raid/build.xml b/src/contrib/raid/build.xml
index 1aa549f..80a2ad6 100644
--- a/src/contrib/raid/build.xml
+++ b/src/contrib/raid/build.xml
@@ -34,7 +34,7 @@
<target name="test" depends="compile,compile-test,test-junit" description="Automated Test Framework" if="test.available"/>
<target name="test-junit" depends="compile,compile-test" if="test.available">
- <junit showoutput="${test.output}" fork="yes" printsummary="yes" errorProperty="tests.failed"
+ <junit maxmemory="512m" showoutput="${test.output}" fork="yes" printsummary="yes" errorProperty="tests.failed"
haltonfailure="no" failureProperty="tests.failed" timeout="${test.timeout}">
<classpath refid="test.classpath"/>
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
index 4fdac1b..a78c62e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
@@ -31,7 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.io.IOUtils;
@@ -46,10 +45,8 @@
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- private Block block; // the block to read from
+ private ExtendedBlock block; // the block to read from
- /** the replica to read from */
- private final Replica replica = null;
/** The visible length of a replica. */
private final long replicaVisibleLength;
@@ -79,7 +76,7 @@
private volatile ChunkChecksum lastChunkChecksum = null;
- public RaidBlockSender(Block block, long blockLength, long startOffset, long length,
+ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, boolean transferToAllowed,
DataInputStream metadataIn, InputStreamFactory streamFactory
@@ -90,14 +87,13 @@
metadataIn, streamFactory, null);
}
- public RaidBlockSender(Block block, long blockLength, long startOffset, long length,
+ public RaidBlockSender(ExtendedBlock block, long blockLength, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, boolean transferToAllowed,
DataInputStream metadataIn, InputStreamFactory streamFactory,
String clientTraceFmt) throws IOException {
try {
this.block = block;
- ChunkChecksum chunkChecksum = null;
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
@@ -140,13 +136,7 @@
length = replicaVisibleLength;
}
- // end is either last byte on disk or the length for which we have a
- // checksum
- if (chunkChecksum != null) {
- endOffset = chunkChecksum.getDataLength();
- } else {
- endOffset = blockLength;
- }
+ endOffset = blockLength;
if (startOffset < 0 || startOffset > endOffset
|| (length + startOffset) > endOffset) {
@@ -166,10 +156,6 @@
if (tmpLen < endOffset) {
// will use on-disk checksum here since the end is a stable chunk
endOffset = tmpLen;
- } else if (chunkChecksum != null) {
- //in last chunk which is changing. flag that we need to use in-memory
- // checksum
- this.lastChunkChecksum = chunkChecksum;
}
}
@@ -454,10 +440,10 @@
}
private static class BlockInputStreamFactory implements InputStreamFactory {
- private final Block block;
+ private final ExtendedBlock block;
private final FSDatasetInterface data;
- private BlockInputStreamFactory(Block block, FSDatasetInterface data) {
+ private BlockInputStreamFactory(ExtendedBlock block, FSDatasetInterface data) {
this.block = block;
this.data = data;
}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
index 927279f..373bb17 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyRaid.java
@@ -27,7 +27,6 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.Comparator;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +34,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -125,7 +125,7 @@
// Add the added block locations in the block locations cache.
// So the rest of the blocks know about these locations.
cachedLocatedBlocks.get(srcPath).
- add(new LocatedBlock(new Block(), result));
+ add(new LocatedBlock(new ExtendedBlock(), result));
return result;
} catch (Exception e) {
LOG.debug("Error happend when choosing datanode to write:" +
@@ -295,8 +295,8 @@
* null if it is the block which is currently being written to
* @return the block locations of companion blocks
*/
- List<LocatedBlock> getCompanionBlocks(String path, FileType type, Block block)
- throws IOException {
+ List<LocatedBlock> getCompanionBlocks(String path, FileType type,
+ Block block) throws IOException {
switch (type) {
case NOT_RAID:
return new ArrayList<LocatedBlock>();
@@ -415,7 +415,7 @@
return blocks.size();
}
for (int i = 0; i < blocks.size(); i++) {
- if (blocks.get(i).getBlock().equals(block)) {
+ if (blocks.get(i).getBlock().getLocalBlock().equals(block)) {
return i;
}
}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
index 207202f..0c8cc7b 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
@@ -26,18 +26,14 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.regex.Pattern;
import java.util.Random;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -47,9 +43,9 @@
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -57,23 +53,19 @@
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.RaidBlockSender;
-import org.apache.hadoop.io.Text;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.raid.RaidNode;
import org.apache.hadoop.raid.RaidUtils;
-import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
/**
@@ -375,7 +367,7 @@
return false;
}
for (LocatedBlock lb: corrupt) {
- Block corruptBlock = lb.getBlock();
+ ExtendedBlock corruptBlock = lb.getBlock();
long corruptOffset = lb.getStartOffset();
LOG.info("Found corrupt block " + corruptBlock +
@@ -446,7 +438,7 @@
return false;
}
for (LocatedBlock lb: corrupt) {
- Block corruptBlock = lb.getBlock();
+ ExtendedBlock corruptBlock = lb.getBlock();
long corruptOffset = lb.getStartOffset();
LOG.info("Found corrupt block " + corruptBlock +
@@ -512,7 +504,7 @@
return false;
}
for (LocatedBlock lb: corrupt) {
- Block corruptBlock = lb.getBlock();
+ ExtendedBlock corruptBlock = lb.getBlock();
long corruptOffset = lb.getStartOffset();
File localBlockFile =
@@ -541,7 +533,7 @@
* parity block in the part file block.
*/
private void processCorruptParityHarPartBlock(FileSystem dfs, Path partFile,
- Block corruptBlock,
+ ExtendedBlock corruptBlock,
long corruptOffset,
FileStatus partFileStat,
HarIndex harIndex,
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index cd12bef..ffdb4ed 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -38,7 +38,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.raid.RaidNode;
import org.apache.hadoop.raid.RaidUtils;
@@ -85,7 +85,7 @@
conf.set("xor".equals(erasureCode) ? RaidNode.RAID_LOCATION_KEY :
RaidNode.RAIDRS_LOCATION_KEY, "/destraid");
- dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
@@ -181,7 +181,7 @@
for (int blockNumToCorrupt : listBlockNumToCorrupt) {
LOG.info("Corrupt block " + blockNumToCorrupt + " of file " + srcFile);
LocatedBlocks locations = getBlockLocations(srcFile);
- corruptBlock(srcFile, locations.get(blockNumToCorrupt).getBlock(),
+ corruptBlock(dfs, srcFile, locations.get(blockNumToCorrupt).getBlock(),
NUM_DATANODES, true);
}
@@ -204,7 +204,6 @@
stripeLength = 3;
mySetup("rs", 3);
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
int[][] corrupt = {{1, 2, 3}, {1, 4, 7}, {3, 6, 7}};
try {
for (int i = 0; i < corrupt.length; i++) {
@@ -253,7 +252,7 @@
for (int blockIdx : corrupt) {
LOG.info("Corrupt block " + blockIdx + " of file " + file);
LocatedBlocks locations = getBlockLocations(file);
- corruptBlock(file, locations.get(blockIdx).getBlock(),
+ corruptBlock(dfs, file, locations.get(blockIdx).getBlock(),
NUM_DATANODES, true);
}
// Test that readFully returns the correct CRC when there are errors.
@@ -280,7 +279,6 @@
mySetup("xor", 1);
Path file = new Path("/user/dhruba/raidtest/file");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
createTestFilePartialLastBlock(fileSys, file, repl, numBlocks, blockSize);
FileStatus stat = fileSys.getFileStatus(file);
@@ -311,7 +309,6 @@
stripeLength = 3;
mySetup("xor", 1);
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
try {
for (int i = 0; i < corrupt.length; i++) {
@@ -445,92 +442,53 @@
LOG.info("Raid HDFS Recovery log verified");
}
- /*
- * The Data directories for a datanode
- */
- private static File[] getDataNodeDirs(int i) throws IOException {
- File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
- File data_dir = new File(base_dir, "data");
- File dir1 = new File(data_dir, "data"+(2*i+1));
- File dir2 = new File(data_dir, "data"+(2*i+2));
- if (dir1.isDirectory() && dir2.isDirectory()) {
- File[] dir = new File[2];
- dir[0] = new File(dir1, "current/finalized");
- dir[1] = new File(dir2, "current/finalized");
- return dir;
- }
- return new File[0];
- }
-
//
// Delete/Corrupt specified block of file
//
- public static void corruptBlock(Path file, Block blockNum,
+ public static void corruptBlock(MiniDFSCluster dfs, Path file, ExtendedBlock blockNum,
int numDataNodes, boolean delete) throws IOException {
- long id = blockNum.getBlockId();
-
- // Now deliberately remove/truncate data blocks from the block.
+ // Now deliberately remove/truncate replicas of blocks
int numDeleted = 0;
int numCorrupted = 0;
for (int i = 0; i < numDataNodes; i++) {
- File[] dirs = getDataNodeDirs(i);
-
- for (int j = 0; j < dirs.length; j++) {
- File[] blocks = dirs[j].listFiles();
- assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
- for (int idx = 0; idx < blocks.length; idx++) {
- if (blocks[idx].getName().startsWith("blk_" + id) &&
- !blocks[idx].getName().endsWith(".meta")) {
- if (delete) {
- blocks[idx].delete();
- LOG.info("Deleted block " + blocks[idx]);
- numDeleted++;
- } else {
- // Corrupt
- File f = blocks[idx];
- long seekPos = f.length()/2;
- RandomAccessFile raf = new RandomAccessFile(f, "rw");
- raf.seek(seekPos);
- int data = raf.readInt();
- raf.seek(seekPos);
- raf.writeInt(data+1);
- LOG.info("Corrupted block " + blocks[idx]);
- numCorrupted++;
- }
- }
- }
+ File block = MiniDFSCluster.getBlockFile(i, blockNum);
+ if (block == null || !block.exists()) {
+ continue;
+ }
+ if (delete) {
+ block.delete();
+ LOG.info("Deleted block " + block);
+ numDeleted++;
+ } else {
+ // Corrupt
+ long seekPos = block.length()/2;
+ RandomAccessFile raf = new RandomAccessFile(block, "rw");
+ raf.seek(seekPos);
+ int data = raf.readInt();
+ raf.seek(seekPos);
+ raf.writeInt(data+1);
+ LOG.info("Corrupted block " + block);
+ numCorrupted++;
}
}
assertTrue("Nothing corrupted or deleted",
(numCorrupted + numDeleted) > 0);
}
- public static void corruptBlock(Path file, Block blockNum,
+ public static void corruptBlock(Path file, ExtendedBlock blockNum,
int numDataNodes, long offset) throws IOException {
- long id = blockNum.getBlockId();
-
- // Now deliberately remove/truncate data blocks from the block.
- //
+ // Now deliberately corrupt replicas of the the block.
for (int i = 0; i < numDataNodes; i++) {
- File[] dirs = getDataNodeDirs(i);
-
- for (int j = 0; j < dirs.length; j++) {
- File[] blocks = dirs[j].listFiles();
- assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
- for (int idx = 0; idx < blocks.length; idx++) {
- if (blocks[idx].getName().startsWith("blk_" + id) &&
- !blocks[idx].getName().endsWith(".meta")) {
- // Corrupt
- File f = blocks[idx];
- RandomAccessFile raf = new RandomAccessFile(f, "rw");
- raf.seek(offset);
- int data = raf.readInt();
- raf.seek(offset);
- raf.writeInt(data+1);
- LOG.info("Corrupted block " + blocks[idx]);
- }
- }
+ File block = MiniDFSCluster.getBlockFile(i, blockNum);
+ if (block == null || !block.exists()) {
+ continue;
}
+ RandomAccessFile raf = new RandomAccessFile(block, "rw");
+ raf.seek(offset);
+ int data = raf.readInt();
+ raf.seek(offset);
+ raf.writeInt(data+1);
+ LOG.info("Corrupted block " + block);
}
}
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
index 0410b93..32a8598 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRaid.java
@@ -38,7 +38,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedFullPathNames;
import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid.CachedLocatedBlocks;
@@ -456,7 +456,7 @@
private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
int[] sourceBlockIndexes, int[] parityBlockIndexes) {
- Set<Block> blockSet = new HashSet<Block>();
+ Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
for (LocatedBlock b : companionBlocks) {
blockSet.add(b.getBlock());
}
@@ -496,10 +496,12 @@
private Collection<LocatedBlock> getCompanionBlocks(
FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
- Block block) throws IOException {
- INodeFile inode = namesystem.blockManager.blocksMap.getINode(block);
+ ExtendedBlock block) throws IOException {
+ INodeFile inode = namesystem.blockManager.blocksMap.getINode(block
+ .getLocalBlock());
FileType type = policy.getFileType(inode.getFullPathName());
- return policy.getCompanionBlocks(inode.getFullPathName(), type, block);
+ return policy.getCompanionBlocks(inode.getFullPathName(), type,
+ block.getLocalBlock());
}
private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
index edce286..10a7212 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
@@ -19,17 +19,10 @@
import java.io.File;
import java.io.FileWriter;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.PrintWriter;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Properties;
import java.util.Random;
import java.util.zip.CRC32;
@@ -40,26 +33,18 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.TestRaidDfs;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.raid.RaidNode;
import org.apache.hadoop.raid.RaidUtils;
@@ -186,7 +171,7 @@
// Corrupt blocks in two different stripes. We can fix them.
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs)
- corruptBlock(locs.get(idx).getBlock().getBlockName());
+ corruptBlock(locs.get(idx).getBlock());
reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
@@ -265,7 +250,7 @@
assertEquals("filesFixed() should return 0 before fixing files",
0, cnode.blockFixer.filesFixed());
- corruptBlock(locs.get(0).getBlock().getBlockName());
+ corruptBlock(locs.get(0).getBlock());
reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
@@ -295,7 +280,7 @@
// Now corrupt the generated block.
locs = RaidDFSUtil.getBlockLocations(
dfs, file1.toUri().getPath(), 0, srcStat.getLen());
- corruptBlock(locs.get(0).getBlock().getBlockName());
+ corruptBlock(locs.get(0).getBlock());
reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
try {
@@ -359,7 +344,6 @@
Path parityFile = new Path("/destraid/user/dhruba/raidtest/file1");
TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
1, 7, blockSize);
- long file1Len = fileSys.getFileStatus(file1).getLen();
LOG.info("Test " + testName + " created test files");
// create an instance of the RaidNode
@@ -395,7 +379,7 @@
// Corrupt parity blocks for different stripes.
int[] corruptBlockIdxs = new int[]{0, 1, 2};
for (int idx: corruptBlockIdxs)
- corruptBlock(locs.get(idx).getBlock().getBlockName());
+ corruptBlock(locs.get(idx).getBlock());
reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
@@ -441,11 +425,9 @@
int stripeLength = 3;
mySetup(stripeLength, 0); // Time before har = 0 days.
Path file1 = new Path("/user/dhruba/raidtest/file1");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
// Parity file will have 7 blocks.
TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
1, 20, blockSize);
- long file1Len = fileSys.getFileStatus(file1).getLen();
LOG.info("Test " + testName + " created test files");
// create an instance of the RaidNode
@@ -496,7 +478,7 @@
// Corrupt parity blocks for different stripes.
int[] corruptBlockIdxs = new int[]{0, 3};
for (int idx: corruptBlockIdxs)
- corruptBlock(locs.get(idx).getBlock().getBlockName());
+ corruptBlock(locs.get(idx).getBlock());
reportCorruptBlocks(dfs, partFile, corruptBlockIdxs,
partStat.getBlockSize());
@@ -564,7 +546,7 @@
conf.setBoolean("dfs.permissions", false);
- dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
@@ -637,9 +619,9 @@
return crc.getValue();
}
- void corruptBlock(String blockName) throws IOException {
+ void corruptBlock(ExtendedBlock block) throws IOException {
assertTrue("Could not corrupt block",
- dfs.corruptBlockOnDataNodes(blockName) > 0);
+ dfs.corruptBlockOnDataNodes(block) > 0);
}
static void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
index 36c087b..2de5568 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
@@ -17,51 +17,18 @@
*/
package org.apache.hadoop.raid;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.zip.CRC32;
-
import org.junit.Test;
import static org.junit.Assert.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.TestRaidDfs;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.raid.RaidNode;
-import org.apache.hadoop.raid.RaidUtils;
public class TestBlockFixerDistConcurrency extends TestBlockFixer {
/**
@@ -117,7 +84,7 @@
// corrupt file1
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs)
- corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+ corruptBlock(file1Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
cnode = RaidNode.createRaidNode(null, localConf);
@@ -133,7 +100,7 @@
// corrupt file2
for (int idx: corruptBlockIdxs)
- corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+ corruptBlock(file2Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
while (blockFixer.jobsRunning() < 2 &&
@@ -224,7 +191,7 @@
// corrupt file1
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs)
- corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+ corruptBlock(file1Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
@@ -241,7 +208,7 @@
// corrupt file2
for (int idx: corruptBlockIdxs)
- corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+ corruptBlock(file2Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
index 19b660f..9375e40 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
@@ -19,18 +19,7 @@
import java.io.File;
import java.io.FileWriter;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
import java.util.Random;
import java.util.zip.CRC32;
@@ -40,25 +29,17 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.hdfs.RaidDFSUtil;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.raid.RaidNode;
@@ -123,12 +104,11 @@
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs) {
LOG.info("Corrupting block " + locations.get(idx).getBlock());
- corruptBlock(locations.get(idx).getBlock().getBlockName());
+ corruptBlock(locations.get(idx).getBlock());
}
TestBlockFixer.reportCorruptBlocks(fileSys, file1, corruptBlockIdxs,
srcStat.getBlockSize());
- String fileUriPath = file1.toUri().getPath();
waitForCorruptBlocks(corruptBlockIdxs.length, dfs, file1);
// Create RaidShell and fix the file.
@@ -147,7 +127,7 @@
long parityCrc = getCRC(fileSys, parityFile);
locations = RaidDFSUtil.getBlockLocations(
dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
- corruptBlock(locations.get(0).getBlock().getBlockName());
+ corruptBlock(locations.get(0).getBlock());
TestBlockFixer.reportCorruptBlocks(fileSys, parityFile, new int[]{0},
srcStat.getBlockSize());
waitForCorruptBlocks(1, dfs, parityFile);
@@ -186,16 +166,6 @@
assertEquals(numCorruptBlocks, actual);
}
- private static DistributedFileSystem getDFS(
- Configuration conf, FileSystem dfs) throws IOException {
- Configuration clientConf = new Configuration(conf);
- clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
- URI dfsUri = dfs.getUri();
- FileSystem.closeAll();
- return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf);
- }
-
private void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
@@ -217,7 +187,7 @@
conf.setInt("hdfs.raid.stripeLength", stripeLength);
conf.set("hdfs.raid.locations", "/destraid");
- dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
@@ -286,8 +256,8 @@
return crc.getValue();
}
- void corruptBlock(String blockName) throws IOException {
+ void corruptBlock(ExtendedBlock block) throws IOException {
assertTrue("Could not corrupt block",
- dfs.corruptBlockOnDataNodes(blockName) > 0);
+ dfs.corruptBlockOnDataNodes(block) > 0);
}
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
index 973f6df..2841621 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
@@ -21,17 +21,7 @@
import java.io.FileWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
import java.util.Random;
-import java.util.zip.CRC32;
import org.junit.Test;
import org.junit.After;
@@ -40,23 +30,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.hdfs.RaidDFSUtil;
import org.apache.hadoop.raid.RaidNode;
@@ -139,7 +122,8 @@
conf.setBoolean("dfs.permissions", false);
- cluster = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
+ .build();
cluster.waitActive();
dfs = (DistributedFileSystem) cluster.getFileSystem();
String namenode = dfs.getUri().toString();
@@ -318,7 +302,6 @@
private void waitUntilCorruptFileCount(DistributedFileSystem dfs,
int corruptFiles)
throws IOException {
- int initialCorruptFiles = RaidDFSUtil.getCorruptFiles(dfs).length;
long waitStart = System.currentTimeMillis();
while (RaidDFSUtil.getCorruptFiles(dfs).length != corruptFiles) {
try {
@@ -332,8 +315,6 @@
}
}
- long waited = System.currentTimeMillis() - waitStart;
-
int corruptFilesFound = RaidDFSUtil.getCorruptFiles(dfs).length;
if (corruptFilesFound != corruptFiles) {
throw new IOException("expected " + corruptFiles +
@@ -349,7 +330,7 @@
Path filePath,
LocatedBlock block)
throws IOException {
- TestRaidDfs.corruptBlock(filePath, block.getBlock(), NUM_DATANODES, true);
+ TestRaidDfs.corruptBlock(cluster, filePath, block.getBlock(), NUM_DATANODES, true);
// report deleted block to the name node
LocatedBlock[] toReport = { block };
@@ -467,28 +448,6 @@
}
}
-
- /**
- * returns the data directories for a data node
- */
- private File[] getDataDirs(int datanode) throws IOException{
- File data_dir = new File(System.getProperty("test.build.data"),
- "dfs/data/");
- File dir1 = new File(data_dir, "data"+(2 * datanode + 1));
- File dir2 = new File(data_dir, "data"+(2 * datanode + 2));
- if (!(dir1.isDirectory() && dir2.isDirectory())) {
- throw new IOException("data directories not found for data node " +
- datanode + ": " + dir1.toString() + " " +
- dir2.toString());
- }
-
- File[] dirs = new File[2];
- dirs[0] = new File(dir1, "current");
- dirs[1] = new File(dir2, "current");
- return dirs;
- }
-
-
/**
* checks fsck with no missing blocks
*/
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
index c9b74d6..31704af 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
@@ -25,28 +25,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.TestRaidDfs;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.raid.RaidNode;
public class TestReedSolomonDecoder extends TestCase {
@@ -93,8 +82,8 @@
FileStatus srcStat = fileSys.getFileStatus(file1);
LocatedBlocks locations = RaidDFSUtil.getBlockLocations(dfs,
file1.toUri().getPath(), 0, srcStat.getLen());
- corruptBlock(locations.get(5).getBlock().getBlockName());
- corruptBlock(locations.get(6).getBlock().getBlockName());
+ corruptBlock(locations.get(5).getBlock());
+ corruptBlock(locations.get(6).getBlock());
TestBlockFixer.reportCorruptBlocks(dfs, file1, new int[]{5, 6},
srcStat.getBlockSize());
@@ -115,9 +104,9 @@
}
}
- void corruptBlock(String blockName) throws IOException {
+ void corruptBlock(ExtendedBlock block) throws IOException {
assertTrue("Could not corrupt block",
- dfs.corruptBlockOnDataNodes(blockName) > 0);
+ dfs.corruptBlockOnDataNodes(block) > 0);
}
private void mySetup() throws Exception {
@@ -130,7 +119,7 @@
conf.setBoolean("dfs.permissions", false);
- dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
String namenode = fileSys.getUri().toString();