blob: cb2ec118e16ebe7bdd82c7a9d56159da5edf90e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
public class TestReadStripedFileWithDecoding {
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private final int smallFileLength = blockSize * dataBlocks - 123;
private final int largeFileLength = blockSize * dataBlocks + 123;
private final int[] fileLengths = {smallFileLength, largeFileLength};
private final int[] dnFailureNums = {1, 2, 3};
@Before
public void setup() throws IOException {
cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
.numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
}
@After
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Shutdown tolerable number of Datanode before reading.
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadWithDNFailure() throws IOException {
for (int fileLength : fileLengths) {
for (int dnFailureNum : dnFailureNums) {
try {
// setup a new cluster with no dead datanode
setup();
testReadWithDNFailure(fileLength, dnFailureNum);
} catch (IOException ioe) {
String fileType = fileLength < (blockSize * dataBlocks) ?
"smallFile" : "largeFile";
LOG.error("Failed to read file with DN failure:"
+ " fileType = "+ fileType
+ ", dnFailureNum = " + dnFailureNum);
} finally {
// tear down the cluster
tearDown();
}
}
}
}
/**
* Corrupt tolerable number of block before reading.
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadCorruptedData() throws IOException {
for (int fileLength : fileLengths) {
for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
testReadWithBlockCorrupted(src, fileLength,
dataDelNum, parityDelNum, false);
}
}
}
}
/**
* Delete tolerable number of block before reading.
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadCorruptedDataByDeleting() throws IOException {
for (int fileLength : fileLengths) {
for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
testReadWithBlockCorrupted(src, fileLength,
dataDelNum, parityDelNum, true);
}
}
}
}
private int findFirstDataNode(Path file, long length) throws IOException {
BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
String name = (locs[0].getNames())[0];
int dnIndex = 0;
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
return dnIndex;
}
dnIndex++;
}
return -1;
}
private void verifyRead(Path testPath, int length, byte[] expected)
throws IOException {
byte[] buffer = new byte[length + 100];
StripedFileTestUtil.verifyLength(fs, testPath, length);
StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
ByteBuffer.allocate(length + 100));
StripedFileTestUtil.verifySeek(fs, testPath, length);
}
private void testReadWithDNFailure(int fileLength, int dnFailureNum)
throws IOException {
String fileType = fileLength < (blockSize * dataBlocks) ?
"smallFile" : "largeFile";
String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
LOG.info("testReadWithDNFailure: file = " + src
+ ", fileSize = " + fileLength
+ ", dnFailureNum = " + dnFailureNum);
Path testPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, testPath, bytes);
// shut down the DN that holds an internal data block
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
cellSize);
for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
String name = (locs[0].getNames())[failedDnIdx];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
}
}
}
// check file length, pread, stateful read and seek
verifyRead(testPath, fileLength, bytes);
}
/**
* After reading a corrupted block, make sure the client can correctly report
* the corruption to the NameNode.
*/
@Test
public void testReportBadBlock() throws IOException {
// create file
final Path file = new Path("/corrupted");
final int length = 10; // length of "corruption"
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
// corrupt the first data block
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
Assert.assertNotEquals(-1, dnIndex);
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
// find the first block file
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
// corrupt the block file
LOG.info("Deliberately corrupting file " + blkFile.getName());
try (FileOutputStream out = new FileOutputStream(blkFile)) {
out.write("corruption".getBytes());
}
// disable the heartbeat from DN so that the corrupted block record is kept
// in NameNode
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
try {
// do stateful read
StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
ByteBuffer.allocate(1024));
// check whether the corruption has been reported to the NameNode
final FSNamesystem ns = cluster.getNamesystem();
final BlockManager bm = ns.getBlockManager();
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
.asFile().getBlocks())[0];
Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
} finally {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
}
}
@Test
public void testInvalidateBlock() throws IOException {
final Path file = new Path("/invalidate");
final int length = 10;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
Assert.assertNotEquals(-1, dnIndex);
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
final Block b = blks[0].getBlock().getLocalBlock();
DataNode dn = cluster.getDataNodes().get(dnIndex);
// disable the heartbeat from DN so that the invalidated block record is kept
// in NameNode until heartbeat expires and NN mark the dn as dead
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
try {
// delete the file
fs.delete(file, true);
// check the block is added to invalidateBlocks
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
Assert.assertTrue(bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
} finally {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
}
/**
* Test reading a file with some blocks(data blocks or parity blocks or both)
* deleted or corrupted.
* @param src file path
* @param fileLength file length
* @param dataBlkDelNum the deleted or corrupted number of data blocks.
* @param parityBlkDelNum the deleted or corrupted number of parity blocks.
* @param deleteBlockFile whether block file is deleted or corrupted.
* true is to delete the block file.
* false is to corrupt the content of the block file.
* @throws IOException
*/
private void testReadWithBlockCorrupted(String src, int fileLength,
int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
throws IOException {
LOG.info("testReadWithBlockCorrupted: file = " + src
+ ", dataBlkDelNum = " + dataBlkDelNum
+ ", parityBlkDelNum = " + parityBlkDelNum
+ ", deleteBlockFile? " + deleteBlockFile);
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
"should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
// write a file with the length of writeLen
Path srcPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, srcPath, bytes);
// delete or corrupt some blocks
corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
// check the file can be read after some blocks were deleted
verifyRead(srcPath, fileLength, bytes);
}
private void corruptBlocks(Path srcPath, int dataBlkDelNum,
int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
dataBlkDelNum);
Assert.assertNotNull(delDataBlkIndices);
int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
dataBlocks + parityBlocks, parityBlkDelNum);
Assert.assertNotNull(delParityBlkIndices);
int[] delBlkIndices = new int[recoverBlkNum];
System.arraycopy(delDataBlkIndices, 0,
delBlkIndices, 0, delDataBlkIndices.length);
System.arraycopy(delParityBlkIndices, 0,
delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
for (int i = 0; i < recoverBlkNum; i++) {
delBlocks[i] = StripedBlockUtil
.constructInternalBlock(lastBlock.getBlock(),
cellSize, dataBlocks, delBlkIndices[i]);
if (deleteBlockFile) {
// delete the block file
cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
} else {
// corrupt the block file
cluster.corruptBlockOnDataNodes(delBlocks[i]);
}
}
}
private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
return fs.getClient().getLocatedBlocks(filePath.toString(),
0, Long.MAX_VALUE);
}
}