blob: 55e744e1a0c70ff726640c0b20b66750b46236b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Class is used to test client reporting corrupted block replica to name node.
* The reporting policy is if block replica is more than one, if all replicas
* are corrupted, client does not report (since the client can handicapped). If
* some of the replicas are corrupted, client reports the corrupted block
* replicas. In case of only one block replica, client always reports corrupted
* replica.
*/
public class TestClientReportBadBlock {
private static final Log LOG = LogFactory
.getLog(TestClientReportBadBlock.class);
static final long BLOCK_SIZE = 64 * 1024;
private static int buffersize;
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
private static int numDataNodes = 3;
private static final Configuration conf = new HdfsConfiguration();
Random rand = new Random();
@Before
public void startUpCluster() throws IOException {
if (System.getProperty("test.build.data") == null) { // to allow test to be
// run outside of Ant
System.setProperty("test.build.data", "build/test/data");
}
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
.build();
cluster.waitActive();
dfs = (DistributedFileSystem) cluster.getFileSystem();
buffersize = conf.getInt("io.file.buffer.size", 4096);
}
@After
public void shutDownCluster() throws IOException {
dfs.close();
cluster.shutdown();
}
/*
* This test creates a file with one block replica. Corrupt the block. Make
* DFSClient read the corrupted file. Corrupted block is expected to be
* reported to name node.
*/
@Test
public void testOneBlockReplica() throws Exception {
final short repl = 1;
final int corruptBlockNumber = 1;
for (int i = 0; i < 2; i++) {
// create a file
String fileName = "/tmp/testClientReportBadBlock/OneBlockReplica" + i;
Path filePath = new Path(fileName);
createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
if (i == 0) {
dfsClientReadFile(filePath);
} else {
dfsClientReadFileFromPosition(filePath);
}
// the only block replica is corrupted. The LocatedBlock should be marked
// as corrupted. But the corrupted replica is expected to be returned
// when calling Namenode#getBlockLocations() since all(one) replicas are
// corrupted.
int expectedReplicaCount = 1;
verifyCorruptedBlockCount(filePath, expectedReplicaCount);
verifyFirstBlockCorrupted(filePath, true);
verifyFsckBlockCorrupted();
testFsckListCorruptFilesBlocks(filePath, -1);
}
}
/**
* This test creates a file with three block replicas. Corrupt all of the
* replicas. Make dfs client read the file. No block corruption should be
* reported.
*/
@Test
public void testCorruptAllOfThreeReplicas() throws Exception {
final short repl = 3;
final int corruptBlockNumber = 3;
for (int i = 0; i < 2; i++) {
// create a file
String fileName = "/tmp/testClientReportBadBlock/testCorruptAllReplicas"
+ i;
Path filePath = new Path(fileName);
createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
// ask dfs client to read the file
if (i == 0) {
dfsClientReadFile(filePath);
} else {
dfsClientReadFileFromPosition(filePath);
}
// As all replicas are corrupted. We expect DFSClient does NOT report
// corrupted replicas to the name node.
int expectedReplicasReturned = repl;
verifyCorruptedBlockCount(filePath, expectedReplicasReturned);
// LocatedBlock should not have the block marked as corrupted.
verifyFirstBlockCorrupted(filePath, false);
verifyFsckHealth("");
testFsckListCorruptFilesBlocks(filePath, 0);
}
}
/**
* This test creates a file with three block replicas. Corrupt two of the
* replicas. Make dfs client read the file. The corrupted blocks with their
* owner data nodes should be reported to the name node.
*/
@Test
public void testCorruptTwoOutOfThreeReplicas() throws Exception {
final short repl = 3;
final int corruptBlocReplicas = 2;
for (int i = 0; i < 2; i++) {
String fileName =
"/tmp/testClientReportBadBlock/CorruptTwoOutOfThreeReplicas"+ i;
Path filePath = new Path(fileName);
createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlocReplicas);
int replicaCount = 0;
/*
* The order of data nodes in LocatedBlock returned by name node is sorted
* by NetworkToplology#pseudoSortByDistance. In current MiniDFSCluster,
* when LocatedBlock is returned, the sorting is based on a random order.
* That is to say, the DFS client and simulated data nodes in mini DFS
* cluster are considered not on the same host nor the same rack.
* Therefore, even we corrupted the first two block replicas based in
* order. When DFSClient read some block replicas, it is not guaranteed
* which block replicas (good/bad) will be returned first. So we try to
* re-read the file until we know the expected replicas numbers is
* returned.
*/
while (replicaCount != repl - corruptBlocReplicas) {
if (i == 0) {
dfsClientReadFile(filePath);
} else {
dfsClientReadFileFromPosition(filePath);
}
LocatedBlocks blocks = dfs.dfs.getNamenode().
getBlockLocations(filePath.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
verifyFirstBlockCorrupted(filePath, false);
int expectedReplicaCount = repl-corruptBlocReplicas;
verifyCorruptedBlockCount(filePath, expectedReplicaCount);
verifyFsckHealth("Target Replicas is 3 but found 1 replica");
testFsckListCorruptFilesBlocks(filePath, 0);
}
}
/**
* create a file with one block and corrupt some/all of the block replicas.
*/
private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
int corruptBlockCount) throws IOException, AccessControlException,
FileNotFoundException, UnresolvedLinkException {
DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0);
DFSTestUtil.waitReplication(dfs, filePath, repl);
// Locate the file blocks by asking name node
final LocatedBlocks locatedblocks = dfs.dfs.getNamenode()
.getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE);
Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length);
// The file only has one block
LocatedBlock lblock = locatedblocks.get(0);
DatanodeInfo[] datanodeinfos = lblock.getLocations();
ExtendedBlock block = lblock.getBlock();
// corrupt some /all of the block replicas
for (int i = 0; i < corruptBlockCount; i++) {
DatanodeInfo dninfo = datanodeinfos[i];
final DataNode dn = cluster.getDataNode(dninfo.getIpcPort());
corruptBlock(block, dn);
LOG.debug("Corrupted block " + block.getBlockName() + " on data node "
+ dninfo.getName());
}
}
/**
* Verify the first block of the file is corrupted (for all its replica).
*/
private void verifyFirstBlockCorrupted(Path filePath, boolean isCorrupted)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final LocatedBlocks locatedBlocks = dfs.dfs.getNamenode()
.getBlockLocations(filePath.toUri().getPath(), 0, Long.MAX_VALUE);
final LocatedBlock firstLocatedBlock = locatedBlocks.get(0);
Assert.assertEquals(isCorrupted, firstLocatedBlock.isCorrupt());
}
/**
* Verify the number of corrupted block replicas by fetching the block
* location from name node.
*/
private void verifyCorruptedBlockCount(Path filePath, int expectedReplicas)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final LocatedBlocks lBlocks = dfs.dfs.getNamenode().getBlockLocations(
filePath.toUri().getPath(), 0, Long.MAX_VALUE);
// we expect only the first block of the file is used for this test
LocatedBlock firstLocatedBlock = lBlocks.get(0);
Assert.assertEquals(expectedReplicas,
firstLocatedBlock.getLocations().length);
}
/**
* Ask dfs client to read the file
*/
private void dfsClientReadFile(Path corruptedFile) throws IOException,
UnresolvedLinkException {
DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
byte[] buf = new byte[buffersize];
int nRead = 0; // total number of bytes read
try {
do {
nRead = in.read(buf, 0, buf.length);
} while (nRead > 0);
} catch (ChecksumException ce) {
// caught ChecksumException if all replicas are bad, ignore and continue.
LOG.debug("DfsClientReadFile caught ChecksumException.");
} catch (BlockMissingException bme) {
// caught BlockMissingException, ignore.
LOG.debug("DfsClientReadFile caught BlockMissingException.");
}
}
/**
* DFS client read bytes starting from the specified position.
*/
private void dfsClientReadFileFromPosition(Path corruptedFile)
throws UnresolvedLinkException, IOException {
DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
byte[] buf = new byte[buffersize];
int startPosition = 2;
int nRead = 0; // total number of bytes read
try {
do {
nRead = in.read(startPosition, buf, 0, buf.length);
startPosition += buf.length;
} while (nRead > 0);
} catch (BlockMissingException bme) {
LOG.debug("DfsClientReadFile caught BlockMissingException.");
}
}
/**
* Corrupt a block on a data node. Replace the block file content with content
* of 1, 2, ...BLOCK_SIZE.
*
* @param block
* the ExtendedBlock to be corrupted
* @param dn
* the data node where the block needs to be corrupted
* @throws FileNotFoundException
* @throws IOException
*/
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
throws FileNotFoundException, IOException {
final File f = DataNodeTestUtils.getBlockFile(
dn, block.getBlockPoolId(), block.getLocalBlock());
final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
final byte[] bytes = new byte[(int) BLOCK_SIZE];
for (int i = 0; i < BLOCK_SIZE; i++) {
bytes[i] = (byte) (i);
}
raFile.write(bytes);
raFile.close();
}
private static void verifyFsckHealth(String expected) throws Exception {
// Fsck health has error code 0.
// Make sure filesystem is in healthy state
String outStr = runFsck(conf, 0, true, "/");
LOG.info(outStr);
Assert.assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
if (!expected.equals("")) {
Assert.assertTrue(outStr.contains(expected));
}
}
private static void verifyFsckBlockCorrupted() throws Exception {
String outStr = runFsck(conf, 1, true, "/");
LOG.info(outStr);
Assert.assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
}
private static void testFsckListCorruptFilesBlocks(Path filePath, int errorCode) throws Exception{
String outStr = runFsck(conf, errorCode, true, filePath.toString(), "-list-corruptfileblocks");
LOG.info("fsck -list-corruptfileblocks out: " + outStr);
if (errorCode != 0) {
Assert.assertTrue(outStr.contains("CORRUPT files"));
}
}
static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode, String... path) throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
int errCode = ToolRunner.run(new DFSck(conf, out), path);
if (checkErrorCode)
Assert.assertEquals(expectedErrCode, errCode);
return bStream.toString();
}
}