blob: ce2e79b73e47e6f254dc43a78aa6b5118a65ae68 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Class is used to test client reporting corrupted block replica to name node.
* The reporting policy is if block replica is more than one, if all replicas
* are corrupted, client does not report (since the client can handicapped). If
* some of the replicas are corrupted, client reports the corrupted block
* replicas. In case of only one block replica, client always reports corrupted
* replica.
*/
public class TestClientReportBadBlock {
private static final Log LOG = LogFactory
.getLog(TestClientReportBadBlock.class);
static final long BLOCK_SIZE = 64 * 1024;
private static int buffersize;
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
private static final int numDataNodes = 3;
private static final Configuration conf = new HdfsConfiguration();
Random rand = new Random();
@Before
public void startUpCluster() throws IOException {
// disable block scanner
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
.build();
cluster.waitActive();
dfs = cluster.getFileSystem();
buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
}
@After
public void shutDownCluster() throws IOException {
if (dfs != null) {
dfs.close();
dfs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/*
* This test creates a file with one block replica. Corrupt the block. Make
* DFSClient read the corrupted file. Corrupted block is expected to be
* reported to name node.
*/
@Test
public void testOneBlockReplica() throws Exception {
final short repl = 1;
final int corruptBlockNumber = 1;
for (int i = 0; i < 2; i++) {
// create a file
String fileName = "/tmp/testClientReportBadBlock/OneBlockReplica" + i;
Path filePath = new Path(fileName);
createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
if (i == 0) {
dfsClientReadFile(filePath);
} else {
dfsClientReadFileFromPosition(filePath);
}
// the only block replica is corrupted. The LocatedBlock should be marked
// as corrupted. But the corrupted replica is expected to be returned
// when calling Namenode#getBlockLocations() since all(one) replicas are
// corrupted.
int expectedReplicaCount = 1;
verifyCorruptedBlockCount(filePath, expectedReplicaCount);
verifyFirstBlockCorrupted(filePath, true);
verifyFsckBlockCorrupted();
testFsckListCorruptFilesBlocks(filePath, -1);
}
}
/**
* This test creates a file with three block replicas. Corrupt all of the
* replicas. Make dfs client read the file. No block corruption should be
* reported.
*/
@Test
public void testCorruptAllOfThreeReplicas() throws Exception {
final short repl = 3;
final int corruptBlockNumber = 3;
for (int i = 0; i < 2; i++) {
// create a file
String fileName = "/tmp/testClientReportBadBlock/testCorruptAllReplicas"
+ i;
Path filePath = new Path(fileName);
createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlockNumber);
// ask dfs client to read the file
if (i == 0) {
dfsClientReadFile(filePath);
} else {
dfsClientReadFileFromPosition(filePath);
}
// As all replicas are corrupted. We expect DFSClient does NOT report
// corrupted replicas to the name node.
int expectedReplicasReturned = repl;
verifyCorruptedBlockCount(filePath, expectedReplicasReturned);
// LocatedBlock should not have the block marked as corrupted.
verifyFirstBlockCorrupted(filePath, false);
verifyFsckHealth("");
testFsckListCorruptFilesBlocks(filePath, 0);
}
}
/**
* This test creates a file with three block replicas. Corrupt two of the
* replicas. Make dfs client read the file. The corrupted blocks with their
* owner data nodes should be reported to the name node.
*/
@Test
public void testCorruptTwoOutOfThreeReplicas() throws Exception {
final short repl = 3;
final int corruptBlocReplicas = 2;
for (int i = 0; i < 2; i++) {
String fileName =
"/tmp/testClientReportBadBlock/CorruptTwoOutOfThreeReplicas"+ i;
Path filePath = new Path(fileName);
createAFileWithCorruptedBlockReplicas(filePath, repl, corruptBlocReplicas);
int replicaCount = 0;
/*
* The order of data nodes in LocatedBlock returned by name node is sorted
* by NetworkToplology#pseudoSortByDistance. In current MiniDFSCluster,
* when LocatedBlock is returned, the sorting is based on a random order.
* That is to say, the DFS client and simulated data nodes in mini DFS
* cluster are considered not on the same host nor the same rack.
* Therefore, even we corrupted the first two block replicas based in
* order. When DFSClient read some block replicas, it is not guaranteed
* which block replicas (good/bad) will be returned first. So we try to
* re-read the file until we know the expected replicas numbers is
* returned.
*/
while (replicaCount != repl - corruptBlocReplicas) {
if (i == 0) {
dfsClientReadFile(filePath);
} else {
dfsClientReadFileFromPosition(filePath);
}
LocatedBlocks blocks = dfs.dfs.getNamenode().
getBlockLocations(filePath.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
verifyFirstBlockCorrupted(filePath, false);
int expectedReplicaCount = repl-corruptBlocReplicas;
verifyCorruptedBlockCount(filePath, expectedReplicaCount);
verifyFsckHealth("Target Replicas is 3 but found 1 live replica");
testFsckListCorruptFilesBlocks(filePath, 0);
}
}
/**
* Create a file with one block and corrupt some/all of the block replicas.
*/
private void createAFileWithCorruptedBlockReplicas(Path filePath, short repl,
int corruptBlockCount) throws IOException, AccessControlException,
FileNotFoundException, UnresolvedLinkException, InterruptedException, TimeoutException {
DFSTestUtil.createFile(dfs, filePath, BLOCK_SIZE, repl, 0);
DFSTestUtil.waitReplication(dfs, filePath, repl);
// Locate the file blocks by asking name node
final LocatedBlocks locatedblocks = dfs.dfs.getNamenode()
.getBlockLocations(filePath.toString(), 0L, BLOCK_SIZE);
Assert.assertEquals(repl, locatedblocks.get(0).getLocations().length);
// The file only has one block
LocatedBlock lblock = locatedblocks.get(0);
DatanodeInfo[] datanodeinfos = lblock.getLocations();
ExtendedBlock block = lblock.getBlock();
// corrupt some /all of the block replicas
for (int i = 0; i < corruptBlockCount; i++) {
DatanodeInfo dninfo = datanodeinfos[i];
final DataNode dn = cluster.getDataNode(dninfo.getIpcPort());
cluster.corruptReplica(dn, block);
LOG.debug("Corrupted block " + block.getBlockName() + " on data node "
+ dninfo);
}
}
/**
* Verify the first block of the file is corrupted (for all its replica).
*/
private void verifyFirstBlockCorrupted(Path filePath, boolean isCorrupted)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final LocatedBlocks locatedBlocks = dfs.dfs.getNamenode()
.getBlockLocations(filePath.toUri().getPath(), 0, Long.MAX_VALUE);
final LocatedBlock firstLocatedBlock = locatedBlocks.get(0);
Assert.assertEquals(isCorrupted, firstLocatedBlock.isCorrupt());
}
/**
* Verify the number of corrupted block replicas by fetching the block
* location from name node.
*/
private void verifyCorruptedBlockCount(Path filePath, int expectedReplicas)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
final LocatedBlocks lBlocks = dfs.dfs.getNamenode().getBlockLocations(
filePath.toUri().getPath(), 0, Long.MAX_VALUE);
// we expect only the first block of the file is used for this test
LocatedBlock firstLocatedBlock = lBlocks.get(0);
Assert.assertEquals(expectedReplicas,
firstLocatedBlock.getLocations().length);
}
/**
* Ask dfs client to read the file
*/
private void dfsClientReadFile(Path corruptedFile) throws IOException,
UnresolvedLinkException {
DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
byte[] buf = new byte[buffersize];
int nRead = 0; // total number of bytes read
try {
do {
nRead = in.read(buf, 0, buf.length);
} while (nRead > 0);
} catch (ChecksumException ce) {
// caught ChecksumException if all replicas are bad, ignore and continue.
LOG.debug("DfsClientReadFile caught ChecksumException.");
} catch (BlockMissingException bme) {
// caught BlockMissingException, ignore.
LOG.debug("DfsClientReadFile caught BlockMissingException.");
}
}
/**
* DFS client read bytes starting from the specified position.
*/
private void dfsClientReadFileFromPosition(Path corruptedFile)
throws UnresolvedLinkException, IOException {
DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
byte[] buf = new byte[buffersize];
int startPosition = 2;
int nRead = 0; // total number of bytes read
try {
do {
nRead = in.read(startPosition, buf, 0, buf.length);
startPosition += buf.length;
} while (nRead > 0);
} catch (BlockMissingException bme) {
LOG.debug("DfsClientReadFile caught BlockMissingException.");
}
}
private static void verifyFsckHealth(String expected) throws Exception {
// Fsck health has error code 0.
// Make sure filesystem is in healthy state
String outStr = runFsck(conf, 0, true, "/");
LOG.info(outStr);
Assert.assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
if (!expected.equals("")) {
Assert.assertTrue(outStr.contains(expected));
}
}
private static void verifyFsckBlockCorrupted() throws Exception {
String outStr = runFsck(conf, 1, true, "/");
LOG.info(outStr);
Assert.assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
}
private static void testFsckListCorruptFilesBlocks(Path filePath, int errorCode) throws Exception{
String outStr = runFsck(conf, errorCode, true, filePath.toString(), "-list-corruptfileblocks");
LOG.info("fsck -list-corruptfileblocks out: " + outStr);
if (errorCode != 0) {
Assert.assertTrue(outStr.contains("CORRUPT files"));
}
}
static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode, String... path) throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
int errCode = ToolRunner.run(new DFSck(conf, out), path);
if (checkErrorCode)
Assert.assertEquals(expectedErrCode, errCode);
return bStream.toString();
}
}