blob: f12f6f59f8282c2e315f0b4865440bbc348982a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
/**
* Test if FSNamesystem handles heartbeat right
*/
public class TestHeartbeatHandling {
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
/**
* Test if
* {@link FSNamesystem#handleHeartbeat}
* can pick up replication and/or invalidate requests and observes the max
* limit
*/
@Test
public void testHeartbeat() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final String storageID = DatanodeStorage.generateUuid();
dd.updateStorage(new DatanodeStorage(storageID));
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
final DatanodeStorageInfo[] ONE_TARGET = {dd.getStorageInfo(storageID)};
try {
namesystem.writeLock();
synchronized(hm) {
for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
dd.addBlockToBeReplicated(
new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP),
ONE_TARGET);
}
DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd,
namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
}
dd.addBlocksToBeInvalidated(blockList);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(2, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(2, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[0].getAction());
assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem)
.getCommands();
assertEquals(0, cmds.length);
}
} finally {
namesystem.writeUnlock();
}
} finally {
cluster.shutdown();
}
}
/**
* Test if
* {@link FSNamesystem#handleHeartbeat}
* correctly selects data node targets for block recovery.
*/
@Test
public void testHeartbeatBlockRecovery() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg1 =
InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
dd1.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
final DatanodeRegistration nodeReg2 =
InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
dd2.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
final DatanodeRegistration nodeReg3 =
InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
dd3.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
try {
namesystem.writeLock();
synchronized(hm) {
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
// Test with all alive nodes.
DFSTestUtil.resetLastUpdatesWithOffset(dd1, 0);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, 0);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
final DatanodeStorageInfo[] storages = {
dd1.getStorageInfos()[0],
dd2.getStorageInfos()[0],
dd3.getStorageInfos()[0]};
BlockInfo blockInfo = new BlockInfoContiguous(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
storages);
dd1.addBlockToBeRecovered(blockInfo);
DatanodeCommand[] cmds =
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
assertEquals(3, recoveringNodes.length);
assertEquals(recoveringNodes[0], dd1);
assertEquals(recoveringNodes[1], dd2);
assertEquals(recoveringNodes[2], dd3);
// Test with one stale node.
DFSTestUtil.resetLastUpdatesWithOffset(dd1, 0);
// More than the default stale interval of 30 seconds.
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -40 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
blockInfo = new BlockInfoContiguous(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
storages);
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
assertEquals(2, recoveringNodes.length);
// dd2 is skipped.
assertEquals(recoveringNodes[0], dd1);
assertEquals(recoveringNodes[1], dd3);
// Test with all stale node.
DFSTestUtil.resetLastUpdatesWithOffset(dd1, - 60 * 1000);
// More than the default stale interval of 30 seconds.
DFSTestUtil.resetLastUpdatesWithOffset(dd2, - 40 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, - 80 * 1000);
blockInfo = new BlockInfoContiguous(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_RECOVERY,
storages);
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
// Only dd1 is included since it heart beated and hence its not stale
// when the list of recovery blocks is constructed.
assertEquals(3, recoveringNodes.length);
assertEquals(recoveringNodes[0], dd1);
assertEquals(recoveringNodes[1], dd2);
assertEquals(recoveringNodes[2], dd3);
}
} finally {
namesystem.writeUnlock();
}
} finally {
cluster.shutdown();
}
}
@Test
public void testHeartbeatStopWatch() throws Exception {
Namesystem ns = Mockito.mock(Namesystem.class);
BlockManager bm = Mockito.mock(BlockManager.class);
Configuration conf = new Configuration();
long recheck = 2000;
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck);
HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf);
monitor.restartHeartbeatStopWatch();
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
// sleep shorter than recheck and verify shouldn't abort
Thread.sleep(100);
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
// sleep longer than recheck and verify should abort unless ignore delay
Thread.sleep(recheck);
assertTrue(monitor.shouldAbortHeartbeatCheck(0));
assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3));
// ensure it resets properly
monitor.restartHeartbeatStopWatch();
assertFalse(monitor.shouldAbortHeartbeatCheck(0));
}
}