blob: bba3dbb11966bed2ae907145381b946f7da2489a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.base.Supplier;
/**
* Test cases regarding pipeline recovery during NN failover.
*/
public class TestPipelinesFailover {
static {
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(
"org.apache.hadoop.io.retry.RetryInvocationHandler")).getLogger().setLevel(Level.ALL);
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
}
protected static final Log LOG = LogFactory.getLog(
TestPipelinesFailover.class);
private static final Path TEST_PATH =
new Path("/test-file");
private static final int BLOCK_SIZE = 4096;
private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2;
private static final int STRESS_NUM_THREADS = 25;
private static final int STRESS_RUNTIME = 40000;
enum TestScenario {
GRACEFUL_FAILOVER {
@Override
void run(MiniDFSCluster cluster) throws IOException {
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
}
},
ORIGINAL_ACTIVE_CRASHED {
@Override
void run(MiniDFSCluster cluster) throws IOException {
cluster.restartNameNode(0);
cluster.transitionToActive(1);
}
};
abstract void run(MiniDFSCluster cluster) throws IOException;
}
enum MethodToTestIdempotence {
ALLOCATE_BLOCK,
COMPLETE_FILE;
}
/**
* Tests continuing a write pipeline over a failover.
*/
@Test(timeout=30000)
public void testWriteOverGracefulFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.GRACEFUL_FAILOVER,
MethodToTestIdempotence.ALLOCATE_BLOCK);
}
@Test(timeout=30000)
public void testAllocateBlockAfterCrashFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
MethodToTestIdempotence.ALLOCATE_BLOCK);
}
@Test(timeout=30000)
public void testCompleteFileAfterCrashFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
MethodToTestIdempotence.COMPLETE_FILE);
}
private void doWriteOverFailoverTest(TestScenario scenario,
MethodToTestIdempotence methodToTest) throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
// Don't check replication periodically.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
FSDataOutputStream stm = null;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
int sizeWritten = 0;
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
sizeWritten += BLOCK_AND_A_HALF;
// Make sure all of the blocks are written out before failover.
stm.hflush();
LOG.info("Failing over to NN 1");
scenario.run(cluster);
// NOTE: explicitly do *not* make any further metadata calls
// to the NN here. The next IPC call should be to allocate the next
// block. Any other call would notice the failover and not test
// idempotence of the operation (HDFS-3031)
FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
BlockManagerTestUtil.updateState(ns1.getBlockManager());
assertEquals(0, ns1.getPendingReplicationBlocks());
assertEquals(0, ns1.getCorruptReplicaBlocks());
assertEquals(0, ns1.getMissingBlocksCount());
// If we're testing allocateBlock()'s idempotence, write another
// block and a half, so we have to allocate a new block.
// Otherise, don't write anything, so our next RPC will be
// completeFile() if we're testing idempotence of that operation.
if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) {
// write another block and a half
AppendTestUtil.write(stm, sizeWritten, BLOCK_AND_A_HALF);
sizeWritten += BLOCK_AND_A_HALF;
}
stm.close();
stm = null;
AppendTestUtil.check(fs, TEST_PATH, sizeWritten);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Tests continuing a write pipeline over a failover when a DN fails
* after the failover - ensures that updating the pipeline succeeds
* even when the pipeline was constructed on a different NN.
*/
@Test(timeout=30000)
public void testWriteOverGracefulFailoverWithDnFail() throws Exception {
doTestWriteOverFailoverWithDnFail(TestScenario.GRACEFUL_FAILOVER);
}
@Test(timeout=30000)
public void testWriteOverCrashFailoverWithDnFail() throws Exception {
doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED);
}
private void doTestWriteOverFailoverWithDnFail(TestScenario scenario)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(5)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
// Make sure all the blocks are written before failover
stm.hflush();
LOG.info("Failing over to NN 1");
scenario.run(cluster);
assertTrue(fs.exists(TEST_PATH));
cluster.stopDataNode(0);
// write another block and a half
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
stm.hflush();
LOG.info("Failing back to NN 0");
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
cluster.stopDataNode(1);
AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF);
stm.hflush();
stm.close();
stm = null;
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF * 3);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Tests lease recovery if a client crashes. This approximates the
* use case of HBase WALs being recovered after a NN failover.
*/
@Test(timeout=30000)
public void testLeaseRecoveryAfterFailover() throws Exception {
final Configuration conf = new Configuration();
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(TEST_PATH));
FileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
loopRecoverLease(fsOtherUser, TEST_PATH);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
// Fail back to ensure that the block locations weren't lost on the
// original node.
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Test the scenario where the NN fails over after issuing a block
* synchronization request, but before it is committed. The
* DN running the recovery should then fail to commit the synchronization
* and a later retry will succeed.
*/
@Test(timeout=30000)
public void testFailoverRightBeforeCommitSynchronization() throws Exception {
final Configuration conf = new Configuration();
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a half block
AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2);
stm.hflush();
// Look into the block manager on the active node for the block
// under construction.
NameNode nn0 = cluster.getNameNode(0);
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
DatanodeDescriptor expectedPrimary =
DFSTestUtil.getExpectedPrimaryNode(nn0, blk);
LOG.info("Expecting block recovery to be triggered on DN " +
expectedPrimary);
// Find the corresponding DN daemon, and spy on its connection to the
// active.
DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
DatanodeProtocolClientSideTranslatorPB nnSpy =
DataNodeTestUtils.spyOnBposToNN(primaryDN, nn0);
// Delay the commitBlockSynchronization call
DelayAnswer delayer = new DelayAnswer(LOG);
Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization(
Mockito.eq(blk),
Mockito.anyInt(), // new genstamp
Mockito.anyLong(), // new length
Mockito.eq(true), // close file
Mockito.eq(false), // delete block
(DatanodeID[]) Mockito.anyObject(), // new targets
(String[]) Mockito.anyObject()); // new target storages
DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
assertFalse(fsOtherUser.recoverLease(TEST_PATH));
LOG.info("Waiting for commitBlockSynchronization call from primary");
delayer.waitForCall();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// Let the commitBlockSynchronization call go through, and check that
// it failed with the correct exception.
delayer.proceed();
delayer.waitForResult();
Throwable t = delayer.getThrown();
if (t == null) {
fail("commitBlockSynchronization call did not fail on standby");
}
GenericTestUtils.assertExceptionContains(
"Operation category WRITE is not supported",
t);
// Now, if we try again to recover the block, it should succeed on the new
// active.
loopRecoverLease(fsOtherUser, TEST_PATH);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE/2);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Stress test for pipeline/lease recovery. Starts a number of
* threads, each of which creates a file and has another client
* break the lease. While these threads run, failover proceeds
* back and forth between two namenodes.
*/
@Test(timeout=STRESS_RUNTIME*3)
public void testPipelineRecoveryStress() throws Exception {
HAStressTestHarness harness = new HAStressTestHarness();
// Disable permissions so that another user can recover the lease.
harness.conf.setBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
// This test triggers rapid NN failovers. The client retry policy uses an
// exponential backoff. This can quickly lead to long sleep times and even
// timeout the whole test. Cap the sleep time at 1s to prevent this.
harness.conf.setInt(DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
1000);
final MiniDFSCluster cluster = harness.startCluster();
try {
cluster.waitActive();
cluster.transitionToActive(0);
FileSystem fs = harness.getFailoverFs();
DistributedFileSystem fsAsOtherUser = createFsAsOtherUser(
cluster, harness.conf);
TestContext testers = new TestContext();
for (int i = 0; i < STRESS_NUM_THREADS; i++) {
Path p = new Path("/test-" + i);
testers.addThread(new PipelineTestThread(
testers, fs, fsAsOtherUser, p));
}
// Start a separate thread which will make sure that replication
// happens quickly by triggering deletion reports and replication
// work calculation frequently.
harness.addReplicationTriggerThread(500);
harness.addFailoverThread(5000);
harness.startThreads();
testers.startThreads();
testers.waitFor(STRESS_RUNTIME);
testers.stop();
harness.stopThreads();
} finally {
System.err.println("===========================\n\n\n\n");
harness.shutdown();
}
}
/**
* Test thread which creates a file, has another fake user recover
* the lease on the file, and then ensures that the file's contents
* are properly readable. If any of these steps fails, propagates
* an exception back to the test context, causing the test case
* to fail.
*/
private static class PipelineTestThread extends RepeatingTestThread {
private final FileSystem fs;
private final FileSystem fsOtherUser;
private final Path path;
public PipelineTestThread(TestContext ctx,
FileSystem fs, FileSystem fsOtherUser, Path p) {
super(ctx);
this.fs = fs;
this.fsOtherUser = fsOtherUser;
this.path = p;
}
@Override
public void doAnAction() throws Exception {
FSDataOutputStream stm = fs.create(path, true);
try {
AppendTestUtil.write(stm, 0, 100);
stm.hflush();
loopRecoverLease(fsOtherUser, path);
AppendTestUtil.check(fs, path, 100);
} finally {
try {
stm.close();
} catch (IOException e) {
// should expect this since we lost the lease
}
}
}
@Override
public String toString() {
return "Pipeline test thread for " + path;
}
}
private DistributedFileSystem createFsAsOtherUser(
final MiniDFSCluster cluster, final Configuration conf)
throws IOException, InterruptedException {
return (DistributedFileSystem) UserGroupInformation.createUserForTesting(
"otheruser", new String[] { "othergroup"})
.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return HATestUtil.configureFailoverFs(
cluster, conf);
}
});
}
/**
* Try to recover the lease on the given file for up to 60 seconds.
* @param fsOtherUser the filesystem to use for the recoverLease call
* @param testPath the path on which to run lease recovery
* @throws TimeoutException if lease recover does not succeed within 60
* seconds
* @throws InterruptedException if the thread is interrupted
*/
private static void loopRecoverLease(
final FileSystem fsOtherUser, final Path testPath)
throws TimeoutException, InterruptedException {
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean success;
try {
success = ((DistributedFileSystem)fsOtherUser)
.recoverLease(testPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (!success) {
LOG.info("Waiting to recover lease successfully");
}
return success;
}
}, 1000, 60000);
} catch (TimeoutException e) {
throw new TimeoutException("Timed out recovering lease for " +
testPath);
}
}
}