blob: 7bd976f3fc0d4f6b367103049de6f39f8a0f2f49 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.base.Preconditions;
public class TestDFSStripedOutputStreamWithFailure {
public static final Log LOG = LogFactory.getLog(
TestDFSStripedOutputStreamWithFailure.class);
static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
.getLogger().setLevel(Level.ALL);
}
private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private static final int STRIPES_PER_BLOCK = 4;
private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
private static final int FLUSH_POS
= 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
static {
System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS);
System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS);
System.out.println("CELL_SIZE = " + CELL_SIZE
+ " (=" + StringUtils.TraditionalBinaryPrefix.long2String(CELL_SIZE, "B", 2) + ")");
System.out.println("BLOCK_SIZE = " + BLOCK_SIZE
+ " (=" + StringUtils.TraditionalBinaryPrefix.long2String(BLOCK_SIZE, "B", 2) + ")");
System.out.println("BLOCK_GROUP_SIZE = " + BLOCK_GROUP_SIZE
+ " (=" + StringUtils.TraditionalBinaryPrefix.long2String(BLOCK_GROUP_SIZE, "B", 2) + ")");
}
static List<Integer> newLengths() {
final List<Integer> lengths = new ArrayList<>();
lengths.add(FLUSH_POS + 2);
for(int b = 0; b <= 2; b++) {
for(int c = 0; c < STRIPES_PER_BLOCK*NUM_DATA_BLOCKS; c++) {
for(int delta = -1; delta <= 1; delta++) {
final int length = b*BLOCK_GROUP_SIZE + c*CELL_SIZE + delta;
System.out.println(lengths.size() + ": length=" + length
+ ", (b, c, d) = (" + b + ", " + c + ", " + delta + ")");
lengths.add(length);
}
}
}
return lengths;
}
private static final int[][] dnIndexSuite = {
{0, 1},
{0, 5},
{0, 6},
{0, 8},
{1, 5},
{1, 6},
{6, 8},
{0, 1, 2},
{3, 4, 5},
{0, 1, 6},
{0, 5, 6},
{0, 5, 8},
{0, 6, 7},
{5, 6, 7},
{6, 7, 8},
};
private int[] getKillPositions(int fileLen, int num) {
int[] positions = new int[num];
for (int i = 0; i < num; i++) {
positions[i] = fileLen * (i + 1) / (num + 1);
}
return positions;
}
private static final List<Integer> LENGTHS = newLengths();
static int getLength(int i) {
return LENGTHS.get(i);
}
private MiniDFSCluster cluster;
private DistributedFileSystem dfs;
private final Path dir = new Path("/"
+ TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
private final Random random = new Random();
private void setup(Configuration conf) throws IOException {
final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
dfs.mkdirs(dir);
dfs.setErasureCodingPolicy(dir, null);
}
private void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
private HdfsConfiguration newHdfsConfiguration() {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
return conf;
}
@Test(timeout=240000)
public void testDatanodeFailure56() throws Exception {
runTest(getLength(56));
}
@Test(timeout=240000)
public void testDatanodeFailureRandomLength() throws Exception {
int lenIndex = random.nextInt(LENGTHS.size());
LOG.info("run testMultipleDatanodeFailureRandomLength with length index: "
+ lenIndex);
runTest(getLength(lenIndex));
}
@Test(timeout=240000)
public void testMultipleDatanodeFailure56() throws Exception {
runTestWithMultipleFailure(getLength(56));
}
/**
* Randomly pick a length and run tests with multiple data failures
* TODO: enable this later
*/
//@Test(timeout=240000)
public void testMultipleDatanodeFailureRandomLength() throws Exception {
int lenIndex = random.nextInt(LENGTHS.size());
LOG.info("run testMultipleDatanodeFailureRandomLength with length index: "
+ lenIndex);
runTestWithMultipleFailure(getLength(lenIndex));
}
@Test(timeout=240000)
public void testBlockTokenExpired() throws Exception {
final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
final HdfsConfiguration conf = newHdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
for (int dn = 0; dn < 9; dn += 2) {
try {
setup(conf);
runTest(length, new int[]{length/2}, new int[]{dn}, true);
} catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length);
throw e;
} finally {
tearDown();
}
}
}
@Test(timeout = 90000)
public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
try {
setup(conf);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
// shutdown few datanodes to avoid getting sufficient data blocks number
// of datanodes
int killDns = dataNodes.size() / 2;
int numDatanodes = dataNodes.size() - killDns;
for (int i = 0; i < killDns; i++) {
cluster.stopDataNode(i);
}
cluster.restartNameNodes();
cluster.triggerHeartbeats();
DatanodeInfo[] info = dfs.getClient().datanodeReport(DatanodeReportType.LIVE);
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
final Path dirFile = new Path(dir, "ecfile");
FSDataOutputStream out;
try {
out = dfs.create(dirFile, true);
out.write("something".getBytes());
out.flush();
out.close();
Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException ioe) {
// expected
GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" +
" namenode: blockGroupSize= 9, blocks.length= 5", ioe);
}
} finally {
tearDown();
}
}
@Test(timeout = 90000)
public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
try {
setup(conf);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
// shutdown few data nodes to avoid writing parity blocks
int killDns = (NUM_PARITY_BLOCKS - 1);
int numDatanodes = dataNodes.size() - killDns;
for (int i = 0; i < killDns; i++) {
cluster.stopDataNode(i);
}
cluster.restartNameNodes();
cluster.triggerHeartbeats();
DatanodeInfo[] info = dfs.getClient().datanodeReport(
DatanodeReportType.LIVE);
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
int fileLength = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1000;
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
LOG.info("writing finished. Seek and read the file to verify.");
StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
} finally {
tearDown();
}
}
void runTest(final int length) {
final HdfsConfiguration conf = newHdfsConfiguration();
for (int dn = 0; dn < 9; dn++) {
try {
LOG.info("runTest: dn=" + dn + ", length=" + length);
setup(conf);
runTest(length, new int[]{length/2}, new int[]{dn}, false);
} catch (Throwable e) {
final String err = "failed, dn=" + dn + ", length=" + length
+ StringUtils.stringifyException(e);
LOG.error(err);
Assert.fail(err);
} finally {
tearDown();
}
}
}
void runTestWithMultipleFailure(final int length) throws Exception {
final HdfsConfiguration conf = newHdfsConfiguration();
for (int[] dnIndex : dnIndexSuite) {
int[] killPos = getKillPositions(length, dnIndex.length);
try {
LOG.info("runTestWithMultipleFailure: length==" + length + ", killPos="
+ Arrays.toString(killPos) + ", dnIndex=" + Arrays.toString(dnIndex));
setup(conf);
runTest(length, killPos, dnIndex, false);
} catch (Throwable e) {
final String err = "failed, killPos=" + Arrays.toString(killPos)
+ ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
LOG.error(err);
throw e;
} finally {
tearDown();
}
}
}
/**
* runTest implementation
* @param length file length
* @param killPos killing positions in ascending order
* @param dnIndex DN index to kill when meets killing positions
* @param tokenExpire wait token to expire when kill a DN
* @throws Exception
*/
private void runTest(final int length, final int[] killPos,
final int[] dnIndex, final boolean tokenExpire) throws Exception {
if (killPos[0] <= FLUSH_POS) {
LOG.warn("killPos=" + Arrays.toString(killPos) + " <= FLUSH_POS=" + FLUSH_POS
+ ", length=" + length + ", dnIndex=" + Arrays.toString(dnIndex));
return; //skip test
}
Preconditions.checkArgument(length > killPos[0], "length=%s <= killPos=%s",
length, killPos);
Preconditions.checkArgument(killPos.length == dnIndex.length);
final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex)
+ "len" + length + "kill" + Arrays.toString(killPos));
final String fullPath = p.toString();
LOG.info("fullPath=" + fullPath);
if (tokenExpire) {
final NameNode nn = cluster.getNameNode();
final BlockManager bm = nn.getNamesystem().getBlockManager();
final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
// set a short token lifetime (1 second)
SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
}
final AtomicInteger pos = new AtomicInteger();
final FSDataOutputStream out = dfs.create(p);
final DFSStripedOutputStream stripedOut
= (DFSStripedOutputStream)out.getWrappedStream();
long firstGS = -1; // first GS of this block group which never proceeds blockRecovery
long oldGS = -1; // the old GS before bumping
List<Long> gsList = new ArrayList<>();
final List<DatanodeInfo> killedDN = new ArrayList<>();
int numKilled=0;
for(; pos.get() < length; ) {
final int i = pos.getAndIncrement();
if (numKilled < killPos.length && i == killPos[numKilled]) {
assertTrue(firstGS != -1);
final long gs = getGenerationStamp(stripedOut);
if (numKilled == 0) {
assertEquals(firstGS, gs);
} else {
//TODO: implement hflush/hsync and verify gs strict greater than oldGS
assertTrue(gs >= oldGS);
}
oldGS = gs;
if (tokenExpire) {
DFSTestUtil.flushInternal(stripedOut);
waitTokenExpires(out);
}
killedDN.add(killDatanode(cluster, stripedOut, dnIndex[numKilled], pos));
numKilled++;
}
write(out, i);
if (i % BLOCK_GROUP_SIZE == FLUSH_POS) {
firstGS = getGenerationStamp(stripedOut);
oldGS = firstGS;
}
if (i > 0 && (i + 1) % BLOCK_GROUP_SIZE == 0) {
gsList.add(oldGS);
}
}
gsList.add(oldGS);
out.close();
assertEquals(dnIndex.length, numKilled);
StripedFileTestUtil.waitBlockGroupsReported(dfs, fullPath, numKilled);
cluster.triggerBlockReports();
StripedFileTestUtil.checkData(dfs, p, length, killedDN, gsList);
}
static void write(FSDataOutputStream out, int i) throws IOException {
try {
out.write(StripedFileTestUtil.getByte(i));
} catch(IOException ioe) {
throw new IOException("Failed at i=" + i, ioe);
}
}
static long getGenerationStamp(DFSStripedOutputStream out)
throws IOException {
final long gs = out.getBlock().getGenerationStamp();
LOG.info("getGenerationStamp returns " + gs);
return gs;
}
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
for(;;) {
DatanodeInfo[] datanodes = streamer.getNodes();
if (datanodes == null) {
// try peeking following block.
final LocatedBlock lb = streamer.peekFollowingBlock();
if (lb != null) {
datanodes = lb.getLocations();
}
}
if (datanodes != null) {
Assert.assertEquals(1, datanodes.length);
Assert.assertNotNull(datanodes[0]);
return datanodes[0];
}
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Assert.fail(StringUtils.stringifyException(ie));
return null;
}
}
}
static DatanodeInfo killDatanode(MiniDFSCluster cluster,
DFSStripedOutputStream out, final int dnIndex, final AtomicInteger pos) {
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
final DatanodeInfo datanode = getDatanodes(s);
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
if (datanode != null) {
cluster.stopDataNode(datanode.getXferAddr());
}
return datanode;
}
private void waitTokenExpires(FSDataOutputStream out) throws IOException {
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
while (!SecurityTestUtil.isBlockTokenExpired(token)) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
}
public static abstract class TestBase {
static final long TIMEOUT = 240000;
int getBase() {
final String name = getClass().getSimpleName();
int i = name.length() - 1;
for(; i >= 0 && Character.isDigit(name.charAt(i)); i--);
return Integer.parseInt(name.substring(i + 1));
}
private final TestDFSStripedOutputStreamWithFailure test
= new TestDFSStripedOutputStreamWithFailure();
private void run(int offset) {
final int i = offset + getBase();
final int length = getLength(i);
System.out.println("Run test " + i + ", length=" + length);
test.runTest(length);
}
@Test(timeout=TIMEOUT) public void test0() {run(0);}
@Test(timeout=TIMEOUT) public void test1() {run(1);}
@Test(timeout=TIMEOUT) public void test2() {run(2);}
@Test(timeout=TIMEOUT) public void test3() {run(3);}
@Test(timeout=TIMEOUT) public void test4() {run(4);}
@Test(timeout=TIMEOUT) public void test5() {run(5);}
@Test(timeout=TIMEOUT) public void test6() {run(6);}
@Test(timeout=TIMEOUT) public void test7() {run(7);}
@Test(timeout=TIMEOUT) public void test8() {run(8);}
@Test(timeout=TIMEOUT) public void test9() {run(9);}
}
}