blob: c78a10ce0f3cbcdda7d9bae0154d351045bb6034 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Supplier;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Test;
/**
* Test to ensure requests from dead datnodes are rejected by namenode with
* appropriate exceptions/failure response
*/
public class TestDeadDatanode {
private static final Log LOG = LogFactory.getLog(TestDeadDatanode.class);
private MiniDFSCluster cluster;
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* Test to ensure namenode rejects request from dead datanode
* - Start a cluster
* - Shutdown the datanode and wait for it to be marked dead at the namenode
* - Send datanode requests to Namenode and make sure it is rejected
* appropriately.
*/
@Test
public void testDeadDatanode() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
String poolId = cluster.getNamesystem().getBlockPoolId();
// wait for datanode to be marked live
DataNode dn = cluster.getDataNodes().get(0);
DatanodeRegistration reg = InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), true, 20000);
// Shutdown and wait for datanode to be marked dead
dn.shutdown();
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), false, 20000);
DatanodeProtocol dnp = cluster.getNameNodeRpc();
ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
new Block(0),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
null) };
StorageReceivedDeletedBlocks[] storageBlocks = {
new StorageReceivedDeletedBlocks(
new DatanodeStorage(reg.getDatanodeUuid()), blocks) };
// Ensure blockReceived call from dead datanode is not rejected with
// IOException, since it's async, but the node remains unregistered.
dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
BlockManager bm = cluster.getNamesystem().getBlockManager();
// IBRs are async, make sure the NN processes all of them.
bm.flushBlockOps();
assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
// Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport(
new DatanodeStorage(reg.getDatanodeUuid()),
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime(), 0L));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected
}
// Ensure heartbeat from dead datanode is rejected with a command
// that asks datanode to register again
StorageReport[] rep = { new StorageReport(
new DatanodeStorage(reg.getDatanodeUuid()),
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());
}
@Test
public void testDeadNodeAsBlockTarget() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
String poolId = cluster.getNamesystem().getBlockPoolId();
// wait for datanode to be marked live
DataNode dn = cluster.getDataNodes().get(0);
DatanodeRegistration reg = InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
// Get the updated datanode descriptor
BlockManager bm = cluster.getNamesystem().getBlockManager();
DatanodeManager dm = bm.getDatanodeManager();
Node clientNode = dm.getDatanode(reg);
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), true,
20000);
// Shutdown and wait for datanode to be marked dead
dn.shutdown();
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), false,
20000);
// Get the updated datanode descriptor available in DNM
// choose the targets, but local node should not get selected as this is not
// part of the cluster anymore
DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3,
clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7,
null);
for (DatanodeStorageInfo datanodeStorageInfo : results) {
assertFalse("Dead node should not be choosen", datanodeStorageInfo
.getDatanodeDescriptor().equals(clientNode));
}
}
@Test
public void testNonDFSUsedONDeadNodeReReg() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
3000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
6 * 1000);
long CAPACITY = 5000L;
long[] capacities = new long[] { 4 * CAPACITY, 4 * CAPACITY };
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.simulatedCapacities(capacities).build();
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
assertTrue(initialCapacity > 0);
DataNode dn1 = cluster.getDataNodes().get(0);
DataNode dn2 = cluster.getDataNodes().get(1);
final DatanodeDescriptor dn2Desc = cluster.getNamesystem(0)
.getBlockManager().getDatanodeManager()
.getDatanode(dn2.getDatanodeId());
dn1.setHeartbeatsDisabledForTests(true);
cluster.setDataNodeDead(dn1.getDatanodeId());
assertEquals("Capacity shouldn't include DeadNode", dn2Desc.getCapacity(),
cluster.getNamesystem(0).getCapacityTotal());
assertEquals("NonDFS-used shouldn't include DeadNode",
dn2Desc.getNonDfsUsed(),
cluster.getNamesystem(0).getNonDfsUsedSpace());
// Wait for re-registration and heartbeat
dn1.setHeartbeatsDisabledForTests(false);
final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0)
.getBlockManager().getDatanodeManager()
.getDatanode(dn1.getDatanodeId());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return dn1Desc.isAlive() && dn1Desc.isHeartbeatedSinceRegistration();
}
}, 100, 5000);
assertEquals("Capacity should be 0 after all DNs dead", initialCapacity,
cluster.getNamesystem(0).getCapacityTotal());
long nonDfsAfterReg = cluster.getNamesystem(0).getNonDfsUsedSpace();
assertEquals("NonDFS should include actual DN NonDFSUsed",
dn1Desc.getNonDfsUsed() + dn2Desc.getNonDfsUsed(), nonDfsAfterReg);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}