blob: 6774aedba332936dd846f2b962f34d6d01bebad5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.junit.Test;
import java.util.List;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestRecoverStripedBlocks {
private final short GROUP_SIZE =
(short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
private MiniDFSCluster cluster;
private final Path dirPath = new Path("/dir");
private Path filePath = new Path(dirPath, "file");
private int maxReplicationStreams =
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
private void initConf(Configuration conf) {
// Large value to make sure the pending replication request can stay in
// DatanodeDescriptor.replicateBlocks before test timeout.
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
// Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
// chooseUnderReplicatedBlocks at once.
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
}
@Test
public void testMissingStripedBlock() throws Exception {
doTestMissingStripedBlock(1, 0);
}
@Test
public void testMissingStripedBlockWithBusyNode1() throws Exception {
doTestMissingStripedBlock(2, 1);
}
@Test
public void testMissingStripedBlockWithBusyNode2() throws Exception {
doTestMissingStripedBlock(3, 1);
}
/**
* Start GROUP_SIZE + 1 datanodes.
* Inject striped blocks to first GROUP_SIZE datanodes.
* Then make numOfBusy datanodes busy, make numOfMissed datanodes missed.
* Then trigger BlockManager to compute recovery works. (so all recovery work
* will be scheduled to the last datanode)
* Finally, verify the recovery work of the last datanode.
*/
private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
throws Exception {
Configuration conf = new HdfsConfiguration();
initConf(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
.build();
try {
cluster.waitActive();
final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, filePath,
dirPath, numBlocks, 1, true);
// all blocks will be located at first GROUP_SIZE DNs, the last DN is
// empty because of the util function createStripedFile
// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile();
assertFalse(fileNode.isUnderConstruction());
assertTrue(fileNode.isStriped());
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(numBlocks, blocks.length);
for (BlockInfo blk : blocks) {
assertTrue(blk.isStriped());
assertTrue(blk.isComplete());
assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS,
blk.getNumBytes());
final BlockInfoStriped sb = (BlockInfoStriped) blk;
assertEquals(GROUP_SIZE, sb.numNodes());
}
final BlockManager bm = cluster.getNamesystem().getBlockManager();
BlockInfo firstBlock = fileNode.getBlocks()[0];
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
// make numOfBusy nodes busy
int i = 0;
for (; i < numOfBusy; i++) {
DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor();
for (int j = 0; j < maxReplicationStreams + 1; j++) {
BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j),
new DatanodeStorageInfo[]{storageInfos[0]});
}
}
// make numOfMissed internal blocks missed
for (; i < numOfBusy + numOfMissed; i++) {
DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
assertEquals(numBlocks, missedNode.numBlocks());
bm.getDatanodeManager().removeDatanode(missedNode);
}
BlockManagerTestUtil.getComputedDatanodeWork(bm);
// all the recovery work will be scheduled on the last DN
DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
DatanodeDescriptor last =
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
assertEquals("Counting the number of outstanding EC tasks", numBlocks,
last.getNumberOfBlocksToBeErasureCoded());
List<BlockECRecoveryInfo> recovery =
last.getErasureCodeCommand(numBlocks);
for (BlockECRecoveryInfo info : recovery) {
assertEquals(1, info.getTargetDnInfos().length);
assertEquals(last, info.getTargetDnInfos()[0]);
assertEquals(info.getSourceDnInfos().length,
info.getLiveBlockIndices().length);
if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) {
// It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen
// to make sure we have NUM_DATA_BLOCKS DNs to do recovery work.
assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length);
} else {
// The block has no highest priority, so we don't use the busy DNs as
// sources
assertEquals(GROUP_SIZE - numOfMissed - numOfBusy,
info.getSourceDnInfos().length);
}
}
} finally {
cluster.shutdown();
}
}
}