HDFS-1371. One bad node can incorrectly flag many files as corrupt. Contributed by Tanping Wang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1125145 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index c2c5a59..3237fb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -610,6 +610,9 @@
HDFS-1922. Fix recurring failure of TestJMXGet (Luke Lu via todd)
+ HDFS-1371. One bad node can incorrectly flag many files as corrupt.
+ (Tanping Wang via jitendra)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
index b74b55d..0052d65 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -22,8 +22,13 @@
import java.net.Socket;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -62,7 +67,7 @@
private LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private DatanodeInfo currentNode = null;
- private ExtendedBlock currentBlock = null;
+ private LocatedBlock currentLocatedBlock = null;
private long pos = 0;
private long blockEnd = -1;
@@ -204,8 +209,11 @@
/**
* Returns the block containing the target position.
*/
- public ExtendedBlock getCurrentBlock() {
- return currentBlock;
+ synchronized public ExtendedBlock getCurrentBlock() {
+ if (currentLocatedBlock == null){
+ return null;
+ }
+ return currentLocatedBlock.getBlock();
}
/**
@@ -261,7 +269,7 @@
if (updatePosition) {
pos = offset;
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
- currentBlock = blk.getBlock();
+ currentLocatedBlock = blk;
}
return blk;
}
@@ -462,8 +470,9 @@
* name readBuffer() is chosen to imply similarity to readBuffer() in
* ChecksuFileSystem
*/
- private synchronized int readBuffer(byte buf[], int off, int len)
- throws IOException {
+ private synchronized int readBuffer(byte buf[], int off, int len,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
IOException ioe;
/* we retry current node only once. So this is set to true only here.
@@ -479,16 +488,19 @@
try {
return blockReader.read(buf, off, len);
} catch ( ChecksumException ce ) {
- DFSClient.LOG.warn("Found Checksum error for " + currentBlock + " from " +
- currentNode.getName() + " at " + ce.getPos());
- dfsClient.reportChecksumFailure(src, currentBlock, currentNode);
+ DFSClient.LOG.warn("Found Checksum error for "
+ + getCurrentBlock() + " from " + currentNode.getName()
+ + " at " + ce.getPos());
ioe = ce;
retryCurrentNode = false;
+ // we want to remember which block replicas we have tried
+ addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+ corruptedBlockMap);
} catch ( IOException e ) {
if (!retryCurrentNode) {
- DFSClient.LOG.warn("Exception while reading from " + currentBlock +
- " of " + src + " from " + currentNode + ": " +
- StringUtils.stringifyException(e));
+ DFSClient.LOG.warn("Exception while reading from "
+ + getCurrentBlock() + " of " + src + " from "
+ + currentNode + ": " + StringUtils.stringifyException(e));
}
ioe = e;
}
@@ -519,6 +531,8 @@
if (closed) {
throw new IOException("Stream closed");
}
+ Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
+ = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
failures = 0;
if (pos < getFileLength()) {
int retries = 2;
@@ -528,7 +542,7 @@
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
- int result = readBuffer(buf, off, realLen);
+ int result = readBuffer(buf, off, realLen, corruptedBlockMap);
if (result >= 0) {
pos += result;
@@ -551,12 +565,34 @@
if (--retries == 0) {
throw e;
}
+ } finally {
+ // Check if need to report block replicas corruption either read
+ // was successful or ChecksumException occured.
+ reportCheckSumFailure(corruptedBlockMap,
+ currentLocatedBlock.getLocations().length);
}
}
}
return -1;
}
+ /**
+ * Add corrupted block replica into map.
+ * @param corruptedBlockMap
+ */
+ private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+ Set<DatanodeInfo> dnSet = null;
+ if((corruptedBlockMap.containsKey(blk))) {
+ dnSet = corruptedBlockMap.get(blk);
+ }else {
+ dnSet = new HashSet<DatanodeInfo>();
+ }
+ if (!dnSet.contains(node)) {
+ dnSet.add(node);
+ corruptedBlockMap.put(blk, dnSet);
+ }
+ }
private DNAddrPair chooseDataNode(LocatedBlock block)
throws IOException {
@@ -605,8 +641,10 @@
}
}
- private void fetchBlockByteRange(LocatedBlock block, long start,
- long end, byte[] buf, int offset) throws IOException {
+ private void fetchBlockByteRange(LocatedBlock block, long start, long end,
+ byte[] buf, int offset,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
//
// Connect to best DataNode for desired Block, with potential offset
//
@@ -646,7 +684,8 @@
DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode.getName());
- dfsClient.reportChecksumFailure(src, block.getBlock(), chosenNode);
+ // we want to remember what we have tried
+ addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
} catch (IOException e) {
if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
DFSClient.LOG.info("Will get a new access token and retry, "
@@ -703,11 +742,21 @@
// corresponding to position and realLen
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
+ Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
+ = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
- fetchBlockByteRange(blk, targetStart,
- targetStart + bytesToRead - 1, buffer, offset);
+ try {
+ fetchBlockByteRange(blk, targetStart,
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ } finally {
+ // Check and report if any block replicas are corrupted.
+ // BlockMissingException may be caught if all block replicas are
+ // corrupted.
+ reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
+ }
+
remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
@@ -718,7 +767,43 @@
}
return realLen;
}
-
+
+ /**
+ * DFSInputStream reports checksum failure.
+ * Case I : client has tried multiple data nodes and at least one of the
+ * attempts has succeeded. We report the other failures as corrupted block to
+ * namenode.
+ * Case II: client has tried out all data nodes, but all failed. We
+ * only report if the total number of replica is 1. We do not
+ * report otherwise since this maybe due to the client is a handicapped client
+ * (who can not read).
+ * @param corruptedBlockMap, map of corrupted blocks
+ * @param dataNodeCount, number of data nodes who contains the block replicas
+ */
+ private void reportCheckSumFailure(
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+ int dataNodeCount) {
+ if (corruptedBlockMap.isEmpty()) {
+ return;
+ }
+ Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
+ .entrySet().iterator();
+ Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
+ ExtendedBlock blk = entry.getKey();
+ Set<DatanodeInfo> dnSet = entry.getValue();
+ if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
+ || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
+ DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
+ int i = 0;
+ for (DatanodeInfo dn:dnSet) {
+ locs[i++] = dn;
+ }
+ LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
+ dfsClient.reportChecksumFailure(src, lblocks);
+ }
+ corruptedBlockMap.clear();
+ }
+
@Override
public long skip(long n) throws IOException {
if ( n > 0 ) {
@@ -760,9 +845,10 @@
}
} catch (IOException e) {//make following read to retry
if(DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Exception while seek to " + targetPos +
- " from " + currentBlock +" of " + src + " from " +
- currentNode + ": " + StringUtils.stringifyException(e));
+ DFSClient.LOG.debug("Exception while seek to " + targetPos
+ + " from " + getCurrentBlock() + " of " + src
+ + " from " + currentNode + ": "
+ + StringUtils.stringifyException(e));
}
}
}
diff --git a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index a00207d..c5c12f0 100644
--- a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -689,6 +689,9 @@
* is corrupt but we will report both to the namenode. In the future,
* we can consider figuring out exactly which block is corrupt.
*/
+ // We do not see a need for user to report block checksum errors and do not
+ // want to rely on user to report block corruptions.
+ @Deprecated
public boolean reportChecksumFailure(Path f,
FSDataInputStream in, long inPos,
FSDataInputStream sums, long sumsPos) {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java
new file mode 100644
index 0000000..97387cc
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestClientReportBadBlock.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.RandomAccessFile;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
+import org.apache.hadoop.hdfs.tools.DFSck;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import junit.framework.Assert;
+
+/**
+ * Class is used to test client reporting corrupted block replica to name node.
+ * The reporting policy is if block replica is more than one, if all replicas
+ * are corrupted, client does not report (since the client can handicapped). If
+ * some of the replicas are corrupted, client reports the corrupted block
+ * replicas. In case of only one block replica, client always reports corrupted
+ * replica.
+ */
+public class TestClientReportBadBlock {
+ private static final Log LOG = LogFactory
+ .getLog(TestClientReportBadBlock.class);
+
+ static final long BLOCK_SIZE = 64 * 1024;
+ private static int buffersize;
+ private static MiniDFSCluster cluster;
+ private static DistributedFileSystem dfs;
+ private static int numDataNodes = 3;
+ private static final Configuration conf = new HdfsConfiguration();
+
+ Random rand = new Random();
+
+ @Before
+ public void startUpCluster() throws IOException {
+ if (System.getProperty("test.build.data") == null) { // to allow test to be
+ // run outside of Ant
+ System.setProperty("test.build.data", "build/test/data");
+ }
+ // disable block scanner
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
+ .build();
+ cluster.waitActive();
+ dfs = (DistributedFileSystem) cluster.getFileSystem();
+ buffersize = conf.getInt("io.file.buffer.size", 4096);
+ }
+
+ @After
+ public void shutDownCluster() throws IOException {
+ dfs.close();
+ cluster.shutdown();
+ }
+
+ /*
+ * This test creates a file with one block replica. Corrupt the block. Make
+ * DFSClient read the corrupted file. Corrupted block is expected to be
+ * reported to name node.
+ */
+ @Test
+ public void testOneBlockReplica() throws Exception {
+ final short repl = 1;
+ final int corruptBlockNumber = 1;
+ for (int i = 0; i < 2; i++) {
+ // create a file
+ String fileName = "/tmp/testClientReportBadBlock/OneBlockReplica" + i;
+ Path filePath = new Path(fileName);
+ createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
+ if (i == 0) {
+ dfsClientReadFile(filePath);
+ } else {
+ dfsClientReadFileFromPosition(filePath);
+ }
+ // the only block replica is corrupted. The LocatedBlock should be marked
+ // as corrupted. But the corrupted replica is expected to be returned
+ // when calling Namenode#getBlockLocations() since all(one) replicas are
+ // corrupted.
+ int expectedReplicaCount = 1;
+ verifyCorruptedBlockCount(filePath, expectedReplicaCount);
+ verifyFirstBlockCorrupted(filePath, true);
+ verifyFsckBlockCorrupted();
+ testFsckListCorruptFilesBlocks(filePath, -1);
+ }
+ }
+
+ /**
+ * This test creates a file with three block replicas. Corrupt all of the
+ * replicas. Make dfs client read the file. No block corruption should be
+ * reported.
+ */
+ @Test
+ public void testCorruptAllOfThreeReplicas() throws Exception {
+ final short repl = 3;
+ final int corruptBlockNumber = 3;
+ for (int i = 0; i < 2; i++) {
+ // create a file
+ String fileName = "/tmp/testClientReportBadBlock/testCorruptAllReplicas"
+ + i;
+ Path filePath = new Path(fileName);
+ createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
+ // ask dfs client to read the file
+ if (i == 0) {
+ dfsClientReadFile(filePath);
+ } else {
+ dfsClientReadFileFromPosition(filePath);
+ }
+ // As all replicas are corrupted. We expect DFSClient does NOT report
+ // corrupted replicas to the name node.
+ int expectedReplicasReturned = repl;
+ verifyCorruptedBlockCount(filePath, expectedReplicasReturned);
+ // LocatedBlock should not have the block marked as corrupted.
+ verifyFirstBlockCorrupted(filePath, false);
+ verifyFsckHealth("");
+ testFsckListCorruptFilesBlocks(filePath, 0);
+ }
+ }
+
+ /**
+ * This test creates a file with three block replicas. Corrupt two of the
+ * replicas. Make dfs client read the file. The corrupted blocks with their
+ * owner data nodes should be reported to the name node.
+ */
+ @Test
+ public void testCorruptTwoOutOfThreeReplicas() throws Exception {
+ final short repl = 3;
+ final int corruptBlocReplicas = 2;
+ for (int i = 0; i < 2; i++) {
+ String fileName =
+ "/tmp/testClientReportBadBlock/CorruptTwoOutOfThreeReplicas"+ i;
+ Path filePath = new Path(fileName);
+ createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlocReplicas);
+ int replicaCount = 0;
+ /*
+ * The order of data nodes in LocatedBlock returned by name node is sorted
+ * by NetworkToplology#pseudoSortByDistance. In current MiniDFSCluster,
+ * when LocatedBlock is returned, the sorting is based on a random order.
+ * That is to say, the DFS client and simulated data nodes in mini DFS
+ * cluster are considered not on the same host nor the same rack.
+ * Therefore, even we corrupted the first two block replicas based in
+ * order. When DFSClient read some block replicas, it is not guaranteed
+ * which block replicas (good/bad) will be returned first. So we try to
+ * re-read the file until we know the expected replicas numbers is
+ * returned.
+ */
+ while (replicaCount != repl - corruptBlocReplicas) {
+ if (i == 0) {
+ dfsClientReadFile(filePath);
+ } else {
+ dfsClientReadFileFromPosition(filePath);
+ }
+ LocatedBlocks blocks = dfs.dfs.getNamenode().
+ getBlockLocations(filePath.toString(), 0, Long.MAX_VALUE);
+ replicaCount = blocks.get(0).getLocations().length;
+ }
+ verifyFirstBlockCorrupted(filePath, false);
+ int expectedReplicaCount = repl-corruptBlocReplicas;
+ verifyCorruptedBlockCount(filePath, expectedReplicaCount);
+ verifyFsckHealth("Target Replicas is 3 but found 1 replica");
+ testFsckListCorruptFilesBlocks(filePath, 0);
+ }
+ }
+
+ /**
+ * create a file with one block and corrupt some/all of the block replicas.
+ */
+ private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
+ int corruptBlockCount) throws IOException, AccessControlException,
+ FileNotFoundException, UnresolvedLinkException {
+ DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0);
+ DFSTestUtil.waitReplication(dfs, filePath, repl);
+ // Locate the file blocks by asking name node
+ final LocatedBlocks locatedblocks = dfs.dfs.getNamenode()
+ .getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE);
+ Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length);
+ // The file only has one block
+ LocatedBlock lblock = locatedblocks.get(0);
+ DatanodeInfo[] datanodeinfos = lblock.getLocations();
+ ExtendedBlock block = lblock.getBlock();
+ // corrupt some /all of the block replicas
+ for (int i = 0; i < corruptBlockCount; i++) {
+ DatanodeInfo dninfo = datanodeinfos[i];
+ final DataNode dn = cluster.getDataNode(dninfo.getIpcPort());
+ corruptBlock(block, dn);
+ LOG.debug("Corrupted block " + block.getBlockName() + " on data node "
+ + dninfo.getName());
+
+ }
+ }
+
+ /**
+ * Verify the first block of the file is corrupted (for all its replica).
+ */
+ private void verifyFirstBlockCorrupted(Path filePath, boolean isCorrupted)
+ throws AccessControlException, FileNotFoundException,
+ UnresolvedLinkException, IOException {
+ final LocatedBlocks locatedBlocks = dfs.dfs.getNamenode()
+ .getBlockLocations(filePath.toUri().getPath(), 0, Long.MAX_VALUE);
+ final LocatedBlock firstLocatedBlock = locatedBlocks.get(0);
+ Assert.assertEquals(isCorrupted, firstLocatedBlock.isCorrupt());
+ }
+
+ /**
+ * Verify the number of corrupted block replicas by fetching the block
+ * location from name node.
+ */
+ private void verifyCorruptedBlockCount(Path filePath, int expectedReplicas)
+ throws AccessControlException, FileNotFoundException,
+ UnresolvedLinkException, IOException {
+ final LocatedBlocks lBlocks = dfs.dfs.getNamenode().getBlockLocations(
+ filePath.toUri().getPath(), 0, Long.MAX_VALUE);
+ // we expect only the first block of the file is used for this test
+ LocatedBlock firstLocatedBlock = lBlocks.get(0);
+ Assert.assertEquals(expectedReplicas,
+ firstLocatedBlock.getLocations().length);
+ }
+
+ /**
+ * Ask dfs client to read the file
+ */
+ private void dfsClientReadFile(Path corruptedFile) throws IOException,
+ UnresolvedLinkException {
+ DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
+ byte[] buf = new byte[buffersize];
+ int nRead = 0; // total number of bytes read
+
+ try {
+ do {
+ nRead = in.read(buf, 0, buf.length);
+ } while (nRead > 0);
+ } catch (ChecksumException ce) {
+ // caught ChecksumException if all replicas are bad, ignore and continue.
+ LOG.debug("DfsClientReadFile caught ChecksumException.");
+ } catch (BlockMissingException bme) {
+ // caught BlockMissingException, ignore.
+ LOG.debug("DfsClientReadFile caught BlockMissingException.");
+ }
+ }
+
+ /**
+ * DFS client read bytes starting from the specified position.
+ */
+ private void dfsClientReadFileFromPosition(Path corruptedFile)
+ throws UnresolvedLinkException, IOException {
+ DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
+ byte[] buf = new byte[buffersize];
+ int startPosition = 2;
+ int nRead = 0; // total number of bytes read
+ try {
+ do {
+ nRead = in.read(startPosition, buf, 0, buf.length);
+ startPosition += buf.length;
+ } while (nRead > 0);
+ } catch (BlockMissingException bme) {
+ LOG.debug("DfsClientReadFile caught BlockMissingException.");
+ }
+ }
+
+ /**
+ * Corrupt a block on a data node. Replace the block file content with content
+ * of 1, 2, ...BLOCK_SIZE.
+ *
+ * @param block
+ * the ExtendedBlock to be corrupted
+ * @param dn
+ * the data node where the block needs to be corrupted
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
+ throws FileNotFoundException, IOException {
+ final FSDataset data = (FSDataset) dn.getFSDataset();
+ final RandomAccessFile raFile = new RandomAccessFile(
+ data.getBlockFile(block), "rw");
+ final byte[] bytes = new byte[(int) BLOCK_SIZE];
+ for (int i = 0; i < BLOCK_SIZE; i++) {
+ bytes[i] = (byte) (i);
+ }
+ raFile.write(bytes);
+ raFile.close();
+ }
+
+ private static void verifyFsckHealth(String expected) throws Exception {
+ // Fsck health has error code 0.
+ // Make sure filesystem is in healthy state
+ String outStr = runFsck(conf, 0, true, "/");
+ LOG.info(outStr);
+ Assert.assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+ if (!expected.equals("")) {
+ Assert.assertTrue(outStr.contains(expected));
+ }
+ }
+
+ private static void verifyFsckBlockCorrupted() throws Exception {
+ String outStr = runFsck(conf, 1, true, "/");
+ LOG.info(outStr);
+ Assert.assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+ }
+
+ private static void testFsckListCorruptFilesBlocks(Path filePath, int errorCode) throws Exception{
+ String outStr = runFsck(conf, errorCode, true, filePath.toString(), "-list-corruptfileblocks");
+ LOG.info("fsck -list-corruptfileblocks out: " + outStr);
+ if (errorCode != 0) {
+ Assert.assertTrue(outStr.contains("CORRUPT files"));
+ }
+ }
+
+ static String runFsck(Configuration conf, int expectedErrCode,
+ boolean checkErrorCode, String... path) throws Exception {
+ ByteArrayOutputStream bStream = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bStream, true);
+ int errCode = ToolRunner.run(new DFSck(conf, out), path);
+ if (checkErrorCode)
+ Assert.assertEquals(expectedErrCode, errCode);
+ return bStream.toString();
+ }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index f371daf..ea957d7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -341,16 +341,17 @@
int replicaCount = 0;
Random random = new Random();
String outStr = null;
+ short factor = 1;
MiniDFSCluster cluster = null;
try {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
Path file1 = new Path("/testCorruptBlock");
- DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
+ DFSTestUtil.createFile(fs, file1, 1024, factor, 0);
// Wait until file replication has completed
- DFSTestUtil.waitReplication(fs, file1, (short)3);
+ DFSTestUtil.waitReplication(fs, file1, factor);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
// Make sure filesystem is in healthy state
@@ -358,18 +359,16 @@
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
- // corrupt replicas
- for (int i=0; i < 3; i++) {
- File blockFile = MiniDFSCluster.getBlockFile(i, block);
- if (blockFile != null && blockFile.exists()) {
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- FileChannel channel = raFile.getChannel();
- String badString = "BADBAD";
- int rand = random.nextInt((int)channel.size()/2);
- raFile.seek(rand);
- raFile.write(badString.getBytes());
- raFile.close();
- }
+ // corrupt replicas
+ File blockFile = MiniDFSCluster.getBlockFile(0, block);
+ if (blockFile != null && blockFile.exists()) {
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ FileChannel channel = raFile.getChannel();
+ String badString = "BADBAD";
+ int rand = random.nextInt((int) channel.size()/2);
+ raFile.seek(rand);
+ raFile.write(badString.getBytes());
+ raFile.close();
}
// Read the file to trigger reportBadBlocks
try {
@@ -384,7 +383,7 @@
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
- while (replicaCount != 3) {
+ while (replicaCount != factor) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {