blob: c19d8c3e4a5e22c387e6ece88f058c3bbffd20fb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumCombineMode;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.slf4j.event.Level;
import java.io.IOException;
import java.util.Random;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
/**
* This test serves a prototype to demo the idea proposed so far. It creates two
* files using the same data, one is in replica mode, the other is in stripped
* layout. For simple, it assumes 6 data blocks in both files and the block size
* are the same.
*/
@RunWith(Parameterized.class)
public class TestFileChecksum {
private static final Logger LOG = LoggerFactory
.getLogger(TestFileChecksum.class);
private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
private int dataBlocks = ecPolicy.getNumDataUnits();
private int parityBlocks = ecPolicy.getNumParityUnits();
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private Configuration conf;
private DFSClient client;
private int cellSize = ecPolicy.getCellSize();
private int stripesPerBlock = 6;
private int blockSize = cellSize * stripesPerBlock;
private int numBlockGroups = 10;
private int stripSize = cellSize * dataBlocks;
private int blockGroupSize = stripesPerBlock * stripSize;
private int fileSize = numBlockGroups * blockGroupSize;
private int bytesPerCRC;
private String ecDir = "/striped";
private String stripedFile1 = ecDir + "/stripedFileChecksum1";
private String stripedFile2 = ecDir + "/stripedFileChecksum2";
private String replicatedFile = "/replicatedFileChecksum";
private String checksumCombineMode;
public TestFileChecksum(String checksumCombineMode) {
this.checksumCombineMode = checksumCombineMode;
}
@Parameterized.Parameters
public static Object[] getParameters() {
return new Object[] {
ChecksumCombineMode.MD5MD5CRC.name(),
ChecksumCombineMode.COMPOSITE_CRC.name()};
}
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2;
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.set(HdfsClientConfigKeys.DFS_CHECKSUM_COMBINE_MODE_KEY,
checksumCombineMode);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
Path ecPath = new Path(ecDir);
cluster.getFileSystem().mkdir(ecPath, FsPermission.getDirDefault());
cluster.getFileSystem().getClient().setErasureCodingPolicy(ecDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
fs = cluster.getFileSystem();
client = fs.getClient();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
bytesPerCRC = conf.getInt(
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
GenericTestUtils.setLogLevel(FileChecksumHelper.LOG, Level.DEBUG);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test(timeout = 90000)
public void testStripedFileChecksum1() throws Exception {
int length = 0;
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length + 10);
}
@Test(timeout = 90000)
public void testStripedFileChecksum2() throws Exception {
int length = stripSize - 1;
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
@Test(timeout = 90000)
public void testStripedFileChecksum3() throws Exception {
int length = stripSize;
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
@Test(timeout = 90000)
public void testStripedFileChecksum4() throws Exception {
int length = stripSize + cellSize * 2;
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
@Test(timeout = 90000)
public void testStripedFileChecksum5() throws Exception {
int length = blockGroupSize;
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
@Test(timeout = 90000)
public void testStripedFileChecksum6() throws Exception {
int length = blockGroupSize + blockSize;
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, length - 10);
}
@Test(timeout = 90000)
public void testStripedFileChecksum7() throws Exception {
int length = -1; // whole file
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
testStripedFileChecksum(length, fileSize);
}
private void testStripedFileChecksum(int range1, int range2)
throws Exception {
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
range1, false);
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2,
range1, false);
FileChecksum stripedFileChecksum3 = getFileChecksum(stripedFile2,
range2, false);
LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
LOG.info("stripedFileChecksum2:" + stripedFileChecksum2);
LOG.info("stripedFileChecksum3:" + stripedFileChecksum3);
Assert.assertTrue(stripedFileChecksum1.equals(stripedFileChecksum2));
if (range1 >=0 && range1 != range2) {
Assert.assertFalse(stripedFileChecksum1.equals(stripedFileChecksum3));
}
}
@Test(timeout = 90000)
public void testStripedAndReplicatedFileChecksum() throws Exception {
prepareTestFiles(fileSize, new String[] {stripedFile1, replicatedFile});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1,
10, false);
FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile,
10, false);
if (checksumCombineMode.equals(ChecksumCombineMode.COMPOSITE_CRC.name())) {
Assert.assertEquals(stripedFileChecksum1, replicatedFileChecksum);
} else {
Assert.assertNotEquals(stripedFileChecksum1, replicatedFileChecksum);
}
}
@Test(timeout = 90000)
public void testDifferentBlockSizeReplicatedFileChecksum() throws Exception {
byte[] fileData = StripedFileTestUtil.generateBytes(fileSize);
String replicatedFile1 = "/replicatedFile1";
String replicatedFile2 = "/replicatedFile2";
DFSTestUtil.writeFile(
fs, new Path(replicatedFile1), fileData, blockSize);
DFSTestUtil.writeFile(
fs, new Path(replicatedFile2), fileData, blockSize / 2);
FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false);
FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false);
if (checksumCombineMode.equals(ChecksumCombineMode.COMPOSITE_CRC.name())) {
Assert.assertEquals(checksum1, checksum2);
} else {
Assert.assertNotEquals(checksum1, checksum2);
}
}
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception {
prepareTestFiles(fileSize, new String[] {stripedFile1});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize,
false);
FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1,
fileSize, true);
LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
Assert.assertTrue("Checksum mismatches!",
stripedFileChecksum1.equals(stripedFileChecksumRecon));
}
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception {
prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1,
false);
FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1,
false);
FileChecksum stripedFileChecksum2Recon = getFileChecksum(stripedFile2, -1,
true);
LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
LOG.info("stripedFileChecksum2:" + stripedFileChecksum1);
LOG.info("stripedFileChecksum2Recon:" + stripedFileChecksum2Recon);
Assert.assertTrue("Checksum mismatches!",
stripedFileChecksum1.equals(stripedFileChecksum2));
Assert.assertTrue("Checksum mismatches!",
stripedFileChecksum1.equals(stripedFileChecksum2Recon));
Assert.assertTrue("Checksum mismatches!",
stripedFileChecksum2.equals(stripedFileChecksum2Recon));
}
private void testStripedFileChecksumWithMissedDataBlocksRangeQuery(
String stripedFile, int requestedLen) throws Exception {
LOG.info("Checksum file:{}, requested length:{}", stripedFile,
requestedLen);
prepareTestFiles(fileSize, new String[] {stripedFile});
FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile,
requestedLen, false);
FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile,
requestedLen, true);
LOG.info("stripedFileChecksum1:" + stripedFileChecksum1);
LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon);
Assert.assertTrue("Checksum mismatches!",
stripedFileChecksum1.equals(stripedFileChecksumRecon));
}
/**
* Test to verify that the checksum can be computed for a small file less than
* bytesPerCRC size.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery1()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 1);
}
/**
* Test to verify that the checksum can be computed for a small file less than
* bytesPerCRC size.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery2()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 10);
}
/**
* Test to verify that the checksum can be computed by giving bytesPerCRC
* length of file range for checksum calculation. 512 is the value of
* bytesPerCRC.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery3()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
bytesPerCRC);
}
/**
* Test to verify that the checksum can be computed by giving 'cellsize'
* length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery4()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
cellSize);
}
/**
* Test to verify that the checksum can be computed by giving less than
* cellsize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery5()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
cellSize - 1);
}
/**
* Test to verify that the checksum can be computed by giving greater than
* cellsize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery6()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
cellSize + 1);
}
/**
* Test to verify that the checksum can be computed by giving two times
* cellsize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery7()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
cellSize * 2);
}
/**
* Test to verify that the checksum can be computed by giving stripSize
* length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery8()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
stripSize);
}
/**
* Test to verify that the checksum can be computed by giving less than
* stripSize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery9()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
stripSize - 1);
}
/**
* Test to verify that the checksum can be computed by giving greater than
* stripSize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery10()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
stripSize + 1);
}
/**
* Test to verify that the checksum can be computed by giving less than
* blockGroupSize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery11()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
blockGroupSize - 1);
}
/**
* Test to verify that the checksum can be computed by giving greaterthan
* blockGroupSize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery12()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
blockGroupSize + 1);
}
/**
* Test to verify that the checksum can be computed by giving greater than
* blockGroupSize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery13()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
blockGroupSize * numBlockGroups / 2);
}
/**
* Test to verify that the checksum can be computed by giving lessthan
* fileSize length of file range for checksum calculation.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery14()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
fileSize - 1);
}
/**
* Test to verify that the checksum can be computed for a length greater than
* file size.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery15()
throws Exception {
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1,
fileSize * 2);
}
/**
* Test to verify that the checksum can be computed for a small file less than
* bytesPerCRC size.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery16()
throws Exception {
int fileLength = 100;
String stripedFile3 = ecDir + "/stripedFileChecksum3";
prepareTestFiles(fileLength, new String[] {stripedFile3});
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
fileLength - 1);
}
/**
* Test to verify that the checksum can be computed for a small file less than
* bytesPerCRC size.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery17()
throws Exception {
int fileLength = 100;
String stripedFile3 = ecDir + "/stripedFileChecksum3";
prepareTestFiles(fileLength, new String[] {stripedFile3});
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 1);
}
/**
* Test to verify that the checksum can be computed for a small file less than
* bytesPerCRC size.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery18()
throws Exception {
int fileLength = 100;
String stripedFile3 = ecDir + "/stripedFileChecksum3";
prepareTestFiles(fileLength, new String[] {stripedFile3});
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 10);
}
/**
* Test to verify that the checksum can be computed with greater than file
* length.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery19()
throws Exception {
int fileLength = 100;
String stripedFile3 = ecDir + "/stripedFileChecksum3";
prepareTestFiles(fileLength, new String[] {stripedFile3});
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
fileLength * 2);
}
/**
* Test to verify that the checksum can be computed for small file with less
* than file length.
*/
@Test(timeout = 90000)
public void testStripedFileChecksumWithMissedDataBlocksRangeQuery20()
throws Exception {
int fileLength = bytesPerCRC;
String stripedFile3 = ecDir + "/stripedFileChecksum3";
prepareTestFiles(fileLength, new String[] {stripedFile3});
testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3,
bytesPerCRC - 1);
}
@Test(timeout = 90000)
public void testStripedFileChecksumWithReconstructFail()
throws Exception {
String stripedFile4 = ecDir + "/stripedFileChecksum4";
prepareTestFiles(fileSize, new String[] {stripedFile4});
// get checksum
FileChecksum fileChecksum = getFileChecksum(stripedFile4, -1, false);
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector newInjector = mock(DataNodeFaultInjector.class);
doThrow(new IOException())
.doNothing()
.when(newInjector)
.stripedBlockChecksumReconstruction();
DataNodeFaultInjector.set(newInjector);
try {
// Get checksum again with reconstruction.
// If the reconstruction task fails, a client try to get checksum from
// another DN which has a block of the block group because of a failure of
// getting result.
FileChecksum fileChecksum1 = getFileChecksum(stripedFile4, -1, true);
Assert.assertEquals("checksum should be same", fileChecksum,
fileChecksum1);
} finally {
DataNodeFaultInjector.set(oldInjector);
}
}
@Test(timeout = 90000)
public void testMixedBytesPerChecksum() throws Exception {
int fileLength = bytesPerCRC * 3;
byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
String replicatedFile1 = "/replicatedFile1";
// Split file into two parts.
byte[] fileDataPart1 = new byte[bytesPerCRC * 2];
System.arraycopy(fileData, 0, fileDataPart1, 0, fileDataPart1.length);
byte[] fileDataPart2 = new byte[fileData.length - fileDataPart1.length];
System.arraycopy(
fileData, fileDataPart1.length, fileDataPart2, 0, fileDataPart2.length);
DFSTestUtil.writeFile(fs, new Path(replicatedFile1), fileDataPart1);
// Modify bytesPerCRC for second part that we append as separate block.
conf.setInt(
HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesPerCRC / 2);
DFSTestUtil.appendFileNewBlock(
((DistributedFileSystem) FileSystem.newInstance(conf)),
new Path(replicatedFile1), fileDataPart2);
if (checksumCombineMode.equals(ChecksumCombineMode.COMPOSITE_CRC.name())) {
String replicatedFile2 = "/replicatedFile2";
DFSTestUtil.writeFile(fs, new Path(replicatedFile2), fileData);
FileChecksum checksum1 = getFileChecksum(replicatedFile1, -1, false);
FileChecksum checksum2 = getFileChecksum(replicatedFile2, -1, false);
Assert.assertEquals(checksum1, checksum2);
} else {
exception.expect(IOException.class);
FileChecksum checksum = getFileChecksum(replicatedFile1, -1, false);
}
}
private FileChecksum getFileChecksum(String filePath, int range,
boolean killDn) throws Exception {
int dnIdxToDie = -1;
if (killDn) {
dnIdxToDie = getDataNodeToKill(filePath);
DataNode dnToDie = cluster.getDataNodes().get(dnIdxToDie);
shutdownDataNode(dnToDie);
}
Path testPath = new Path(filePath);
FileChecksum fc;
if (range >= 0) {
fc = fs.getFileChecksum(testPath, range);
} else {
fc = fs.getFileChecksum(testPath);
}
if (dnIdxToDie != -1) {
cluster.restartDataNode(dnIdxToDie);
}
return fc;
}
private void prepareTestFiles(int fileLength, String[] filePaths)
throws IOException {
byte[] fileData = StripedFileTestUtil.generateBytes(fileLength);
for (String filePath : filePaths) {
Path testPath = new Path(filePath);
DFSTestUtil.writeFile(fs, testPath, fileData);
}
}
void shutdownDataNode(DataNode dataNode) throws IOException {
/*
* Kill the datanode which contains one replica
* We need to make sure it dead in namenode: clear its update time and
* trigger NN to check heartbeat.
*/
dataNode.shutdown();
cluster.setDataNodeDead(dataNode.getDatanodeId());
}
/**
* Determine the datanode that hosts the first block of the file. For simple
* this just returns the first datanode as it's firstly tried.
*/
int getDataNodeToKill(String filePath) throws IOException {
LocatedBlocks locatedBlocks = client.getLocatedBlocks(filePath, 0);
LocatedBlock locatedBlock = locatedBlocks.get(0);
DatanodeInfo[] datanodes = locatedBlock.getLocations();
DatanodeInfo chosenDn = datanodes[new Random().nextInt(datanodes.length)];
int idx = 0;
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getInfoPort() == chosenDn.getInfoPort()) {
return idx;
}
idx++;
}
return -1;
}
}