blob: 28427bce75a13fff745d3a1be6bd04b7d59ef59e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
/**
* Test suite covering lifeline protocol handling in the DataNode.
*/
public class TestDataNodeLifeline {
private static final Logger LOG = LoggerFactory.getLogger(
TestDataNodeLifeline.class);
static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
}
@Rule
public Timeout timeout = new Timeout(60000);
private MiniDFSCluster cluster;
private HdfsConfiguration conf;
private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
private DataNodeMetrics metrics;
private DatanodeProtocolClientSideTranslatorPB namenode;
private FSNamesystem namesystem;
private DataNode dn;
private BPServiceActor bpsa;
@Before
public void setup() throws Exception {
// Configure cluster with lifeline RPC server enabled, and down-tune
// heartbeat timings to try to force quick dead/stale DataNodes.
conf = new HdfsConfiguration();
conf.setInt(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, 2);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 6 * 1000);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
namesystem = cluster.getNameNode().getNamesystem();
// Set up spies on RPC proxies so that we can inject failures.
dn = cluster.getDataNodes().get(0);
metrics = dn.getMetrics();
assertNotNull(metrics);
List<BPOfferService> allBpos = dn.getAllBpOs();
assertNotNull(allBpos);
assertEquals(1, allBpos.size());
BPOfferService bpos = allBpos.get(0);
List<BPServiceActor> allBpsa = bpos.getBPServiceActors();
assertNotNull(allBpsa);
assertEquals(1, allBpsa.size());
bpsa = allBpsa.get(0);
assertNotNull(bpsa);
// Lifeline RPC proxy gets created on separate thread, so poll until found.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (bpsa.getLifelineNameNodeProxy() != null) {
lifelineNamenode = spy(bpsa.getLifelineNameNodeProxy());
bpsa.setLifelineNameNode(lifelineNamenode);
}
return lifelineNamenode != null;
}
}, 100, 10000);
assertNotNull(bpsa.getNameNodeProxy());
namenode = spy(bpsa.getNameNodeProxy());
bpsa.setNameNode(namenode);
}
@After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
GenericTestUtils.assertNoThreadsMatching(".*lifeline.*");
}
}
@Test
public void testSendLifelineIfHeartbeatBlocked() throws Exception {
// Run the test for the duration of sending 10 lifeline RPC messages.
int numLifelines = 10;
CountDownLatch lifelinesSent = new CountDownLatch(numLifelines);
// Intercept heartbeat to inject an artificial delay, until all expected
// lifeline RPC messages have been sent.
doAnswer(new LatchAwaitingAnswer<HeartbeatResponse>(lifelinesSent))
.when(namenode).sendHeartbeat(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class),
any(SlowDiskReports.class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
.when(lifelineNamenode).sendLifeline(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class));
// While waiting on the latch for the expected number of lifeline messages,
// poll DataNode tracking information. Thanks to the lifeline, we expect
// that the DataNode always stays alive, and never goes stale or dead.
while (!lifelinesSent.await(1, SECONDS)) {
assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
namesystem.getNumLiveDataNodes());
assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
namesystem.getNumDeadDataNodes());
assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
namesystem.getNumStaleDataNodes());
}
// Verify that we did in fact call the lifeline RPC.
verify(lifelineNamenode, atLeastOnce()).sendLifeline(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class));
// Also verify lifeline call through metrics. We expect at least
// numLifelines, guaranteed by waiting on the latch. There is a small
// possibility of extra lifeline calls depending on timing, so we allow
// slack in the assertion.
assertTrue("Expect metrics to count at least " + numLifelines + " calls.",
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())) >=
numLifelines);
}
@Test
public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
// Run the test for the duration of sending 10 heartbeat RPC messages.
int numHeartbeats = 10;
CountDownLatch heartbeatsSent = new CountDownLatch(numHeartbeats);
// Intercept heartbeat to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<HeartbeatResponse>(heartbeatsSent))
.when(namenode).sendHeartbeat(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class),
any(SlowDiskReports.class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always
// stays alive, and never goes stale or dead.
while (!heartbeatsSent.await(1, SECONDS)) {
assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
namesystem.getNumLiveDataNodes());
assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
namesystem.getNumDeadDataNodes());
assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
namesystem.getNumStaleDataNodes());
}
// Verify that we did not call the lifeline RPC.
verify(lifelineNamenode, never()).sendLifeline(
any(DatanodeRegistration.class),
any(StorageReport[].class),
anyLong(),
anyLong(),
anyInt(),
anyInt(),
anyInt(),
any(VolumeFailureSummary.class));
// Also verify no lifeline calls through metrics.
assertEquals("Expect metrics to count no lifeline calls.", 0,
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
}
@Test
public void testLifelineForDeadNode() throws Exception {
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
assertTrue(initialCapacity > 0);
dn.setHeartbeatsDisabledForTests(true);
cluster.setDataNodesDead();
assertEquals("Capacity should be 0 after all DNs dead", 0, cluster
.getNamesystem(0).getCapacityTotal());
bpsa.sendLifelineForTests();
assertEquals("Lifeline should be ignored for dead node", 0, cluster
.getNamesystem(0).getCapacityTotal());
// Wait for re-registration and heartbeat
dn.setHeartbeatsDisabledForTests(false);
final DatanodeDescriptor dnDesc = cluster.getNamesystem(0).getBlockManager()
.getDatanodeManager().getDatanodes().iterator().next();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dnDesc.isAlive() && dnDesc.isHeartbeatedSinceRegistration();
}
}, 100, 5000);
assertEquals("Capacity should include only live capacity", initialCapacity,
cluster.getNamesystem(0).getCapacityTotal());
}
/**
* Waits on a {@link CountDownLatch} before calling through to the method.
*/
private final class LatchAwaitingAnswer<T> implements Answer<T> {
private final CountDownLatch latch;
public LatchAwaitingAnswer(CountDownLatch latch) {
this.latch = latch;
}
@Override
@SuppressWarnings("unchecked")
public T answer(InvocationOnMock invocation)
throws Throwable {
LOG.info("Awaiting, remaining latch count is {}.", latch.getCount());
latch.await();
return (T)invocation.callRealMethod();
}
}
/**
* Counts on a {@link CountDownLatch} after each call through to the method.
*/
private final class LatchCountingAnswer<T> implements Answer<T> {
private final CountDownLatch latch;
public LatchCountingAnswer(CountDownLatch latch) {
this.latch = latch;
}
@Override
@SuppressWarnings("unchecked")
public T answer(InvocationOnMock invocation)
throws Throwable {
T result = (T)invocation.callRealMethod();
latch.countDown();
LOG.info("Countdown, remaining latch count is {}.", latch.getCount());
return result;
}
}
}