blob: 4c864b384a4a358c4a3ed7b27766c3bba94ee0a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.Timeout;
import java.io.IOException;
/**
* Test reading a striped file when some of its blocks are missing (not included
* in the block locations returned by the NameNode).
*/
public class TestReadStripedFileWithMissingBlocks {
public static final Logger LOG = LoggerFactory
.getLogger(TestReadStripedFileWithMissingBlocks.class);
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient dfsClient;
private Configuration conf = new HdfsConfiguration();
private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
private final short parityBlocks = (short) ecPolicy.getNumParityUnits();
private final int cellSize = ecPolicy.getCellSize();
private final int stripPerBlock = 4;
private final int blockSize = stripPerBlock * cellSize;
private final int blockGroupSize = blockSize * dataBlocks;
// Starting with two more datanodes, minimum 9 should be up for
// test to pass.
private final int numDNs = dataBlocks + parityBlocks + 2;
private final int fileLength = blockSize * dataBlocks + 123;
@Rule
public Timeout globalTimeout = new Timeout(300000);
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy(
"/", ecPolicy.getName());
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(ecPolicy.getName());
dfsClient = new DFSClient(cluster.getNameNode(0).getNameNodeAddress(),
conf);
}
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Test
public void testReadFileWithMissingBlocks() throws Exception {
try {
setup();
Path srcPath = new Path("/foo");
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, srcPath, new String(expected));
StripedFileTestUtil
.waitBlockGroupsReported(fs, srcPath.toUri().getPath());
StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
for (int missingData = 1; missingData <= dataBlocks; missingData++) {
for (int missingParity = 0; missingParity <=
parityBlocks - missingData; missingParity++) {
readFileWithMissingBlocks(srcPath, fileLength, missingData,
missingParity, expected);
}
}
} finally {
tearDown();
}
}
private void readFileWithMissingBlocks(Path srcPath, int fileLength,
int missingDataNum, int missingParityNum, byte[] expected)
throws Exception {
LOG.info("readFileWithMissingBlocks: (" + missingDataNum + ","
+ missingParityNum + ")");
int dataBlocks = (fileLength - 1) / cellSize + 1;
BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize);
int[] missingDataNodes = new int[missingDataNum + missingParityNum];
for (int i = 0; i < missingDataNum; i++) {
missingDataNodes[i] = i;
}
for (int i = 0; i < missingParityNum; i++) {
missingDataNodes[i + missingDataNum] = i +
Math.min(ecPolicy.getNumDataUnits(), dataBlocks);
}
stopDataNodes(locs, missingDataNodes);
// make sure there are missing block locations
BlockLocation[] newLocs = fs.getFileBlockLocations(srcPath, 0, cellSize);
Assert.assertTrue(
newLocs[0].getNames().length < locs[0].getNames().length);
byte[] smallBuf = new byte[1024];
byte[] largeBuf = new byte[fileLength + 100];
StripedFileTestUtil.verifySeek(fs, srcPath, fileLength, ecPolicy,
blockGroupSize);
StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
smallBuf);
StripedFileTestUtil
.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
restartDeadDataNodes();
}
private void restartDeadDataNodes() throws IOException {
DatanodeInfo[] deadNodes = dfsClient
.datanodeReport(DatanodeReportType.DEAD);
for (DatanodeInfo dnInfo : deadNodes) {
cluster.restartDataNode(dnInfo.getXferAddr());
}
cluster.triggerHeartbeats();
}
private void stopDataNodes(BlockLocation[] locs, int[] datanodes)
throws IOException {
if (locs != null && locs.length > 0) {
for (int failedDNIdx : datanodes) {
String name = (locs[0].getNames())[failedDNIdx];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
cluster.setDataNodeDead(dn.getDatanodeId());
LOG.info("stop datanode " + failedDNIdx);
break;
}
}
}
}
}
}