| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode.ha; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.lang.management.ManagementFactory; |
| import java.lang.management.ThreadInfo; |
| import java.lang.management.ThreadMXBean; |
| import java.net.BindException; |
| import java.net.URI; |
| import java.net.URL; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.function.Supplier; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.LogVerificationAppender; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.MiniDFSNNTopology; |
| import org.apache.hadoop.hdfs.server.common.Util; |
| import org.apache.hadoop.hdfs.server.namenode.*; |
| import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; |
| import org.apache.hadoop.hdfs.util.Canceler; |
| import org.apache.hadoop.io.compress.CompressionCodecFactory; |
| import org.apache.hadoop.io.compress.CompressionOutputStream; |
| import org.apache.hadoop.io.compress.GzipCodec; |
| import org.apache.hadoop.ipc.StandbyException; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; |
| import org.apache.hadoop.test.PathUtils; |
| import org.apache.hadoop.util.Lists; |
| import org.apache.hadoop.util.ThreadUtil; |
| import org.apache.log4j.spi.LoggingEvent; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; |
| import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.junit.Assert.*; |
| import static org.mockito.ArgumentMatchers.any; |
| |
| public class TestStandbyCheckpoints { |
| private static final int NUM_DIRS_IN_LOG = 200000; |
| protected static int NUM_NNS = 3; |
| protected MiniDFSCluster cluster; |
| protected NameNode[] nns = new NameNode[NUM_NNS]; |
| protected FileSystem fs; |
| private final Random random = new Random(); |
| protected File tmpOivImgDir; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestStandbyCheckpoints.class); |
| |
| @SuppressWarnings("rawtypes") |
| @Before |
| public void setupCluster() throws Exception { |
| Configuration conf = setupCommonConfig(); |
| |
| // Dial down the retention of extra edits and checkpoints. This is to |
| // help catch regressions of HDFS-4238 (SBN should not purge shared edits) |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 1); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0); |
| |
| int retryCount = 0; |
| while (true) { |
| try { |
| int basePort = 10060 + random.nextInt(100) * 2; |
| MiniDFSNNTopology topology = new MiniDFSNNTopology() |
| .addNameservice(new MiniDFSNNTopology.NSConf("ns1") |
| .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort)) |
| .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)) |
| .addNN(new MiniDFSNNTopology.NNConf("nn3").setHttpPort(basePort + 2))); |
| |
| cluster = new MiniDFSCluster.Builder(conf) |
| .nnTopology(topology) |
| .numDataNodes(1) |
| .build(); |
| cluster.waitActive(); |
| |
| setNNs(); |
| |
| fs = HATestUtil.configureFailoverFs(cluster, conf); |
| |
| cluster.transitionToActive(0); |
| ++retryCount; |
| break; |
| } catch (BindException e) { |
| LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry " |
| + retryCount + " times"); |
| } |
| } |
| } |
| |
| protected void setNNs(){ |
| for (int i = 0; i < NUM_NNS; i++) { |
| nns[i] = cluster.getNameNode(i); |
| } |
| } |
| |
| protected Configuration setupCommonConfig() { |
| tmpOivImgDir = GenericTestUtils.getTestDir("TestStandbyCheckpoints"); |
| tmpOivImgDir.mkdirs(); |
| |
| Configuration conf = new Configuration(); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); |
| conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY, |
| tmpOivImgDir.getAbsolutePath()); |
| conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); |
| conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, |
| SlowCodec.class.getCanonicalName()); |
| CompressionCodecFactory.setCodecClasses(conf, |
| ImmutableList.<Class>of(SlowCodec.class)); |
| return conf; |
| } |
| |
| @After |
| public void shutdownCluster() throws IOException { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| |
| if (tmpOivImgDir != null) { |
| FileUtil.fullyDelete(tmpOivImgDir); |
| } |
| } |
| |
| @Test(timeout = 300000) |
| public void testSBNCheckpoints() throws Exception { |
| JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nns[1]); |
| |
| doEdits(0, 10); |
| HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]); |
| // Once the standby catches up, it should notice that it needs to |
| // do a checkpoint and save one to its local directories. |
| HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| if (tmpOivImgDir.list().length > 0) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| }, 1000, 60000); |
| |
| // It should have saved the oiv image too. |
| assertEquals("One file is expected", 1, tmpOivImgDir.list().length); |
| |
| // It should also upload it back to the active. |
| HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); |
| |
| // The standby should never try to purge edit logs on shared storage. |
| Mockito.verify(standbyJournalSet, Mockito.never()). |
| purgeLogsOlderThan(Mockito.anyLong()); |
| } |
| |
| @Test |
| public void testNewDirInitAfterCheckpointing() throws Exception { |
| File hdfsDir = new File(PathUtils.getTestDir(TestStandbyCheckpoints.class), |
| "testNewDirInitAfterCheckpointing"); |
| File nameDir = new File(hdfsDir, "name1"); |
| assert nameDir.mkdirs(); |
| |
| // Restart nn0 with an additional name dir. |
| String existingDir = cluster.getConfiguration(0). |
| get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); |
| cluster.getConfiguration(0).set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, |
| existingDir + "," + Util.fileAsURI(nameDir).toString()); |
| cluster.restartNameNode(0); |
| nns[0] = cluster.getNameNode(0); |
| cluster.transitionToActive(0); |
| |
| // "current" is created, but current/VERSION isn't. |
| File currDir = new File(nameDir, "current"); |
| File versionFile = new File(currDir, "VERSION"); |
| assert currDir.exists(); |
| assert !versionFile.exists(); |
| |
| // Trigger a checkpointing and upload. |
| doEdits(0, 10); |
| HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]); |
| |
| // The version file will be created if a checkpoint is uploaded. |
| // Wait for it to happen up to 10 seconds. |
| for (int i = 0; i < 20; i++) { |
| if (versionFile.exists()) { |
| break; |
| } |
| Thread.sleep(500); |
| } |
| // VERSION must have been created. |
| assert versionFile.exists(); |
| } |
| |
| /** |
| * Test for the case when both of the NNs in the cluster are |
| * in the standby state, and thus are both creating checkpoints |
| * and uploading them to each other. |
| * In this circumstance, they should receive the error from the |
| * other node indicating that the other node already has a |
| * checkpoint for the given txid, but this should not cause |
| * an abort, etc. |
| */ |
| @Test(timeout = 300000) |
| public void testBothNodesInStandbyState() throws Exception { |
| doEdits(0, 10); |
| |
| cluster.transitionToStandby(0); |
| |
| // Transitioning to standby closed the edit log on the active, |
| // so the standby will catch up. Then, both will be in standby mode |
| // with enough uncheckpointed txns to cause a checkpoint, and they |
| // will each try to take a checkpoint and upload to each other. |
| HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); |
| HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); |
| |
| assertEquals(12, nns[0].getNamesystem().getFSImage() |
| .getMostRecentCheckpointTxId()); |
| assertEquals(12, nns[1].getNamesystem().getFSImage() |
| .getMostRecentCheckpointTxId()); |
| |
| List<File> dirs = Lists.newArrayList(); |
| dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0)); |
| dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); |
| FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of()); |
| } |
| |
| /** |
| * Test for the case of when there are observer NameNodes, Standby node is |
| * able to upload fsImage to Observer node as well. |
| */ |
| @Test(timeout = 300000) |
| public void testStandbyAndObserverState() throws Exception { |
| // Transition 2 to observer |
| cluster.transitionToObserver(2); |
| doEdits(0, 10); |
| // After a rollEditLog, Standby(nn1) 's next checkpoint would be |
| // ahead of observer(nn2). |
| nns[0].getRpcServer().rollEditLog(); |
| |
| // After standby creating a checkpoint, it will try to push the image to |
| // active and all observer, updating it's own txid to the most recent. |
| HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); |
| HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); |
| HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12)); |
| |
| assertEquals(12, nns[2].getNamesystem().getFSImage() |
| .getMostRecentCheckpointTxId()); |
| assertEquals(12, nns[1].getNamesystem().getFSImage() |
| .getMostRecentCheckpointTxId()); |
| |
| List<File> dirs = Lists.newArrayList(); |
| // observer and standby both have this same image. |
| dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2)); |
| dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); |
| FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of()); |
| // Restore 2 back to standby |
| cluster.transitionToStandby(2); |
| } |
| |
| /** |
| * Tests that a null FSImage is handled gracefully by the ImageServlet. |
| * If putImage is called while a NameNode is still starting up, the FSImage |
| * may not have been initialized yet. See HDFS-15290. |
| */ |
| @Test(timeout = 30000) |
| public void testCheckpointBeforeNameNodeInitializationIsComplete() |
| throws Exception { |
| final LogVerificationAppender appender = new LogVerificationAppender(); |
| final org.apache.log4j.Logger logger = org.apache.log4j.Logger |
| .getRootLogger(); |
| logger.addAppender(appender); |
| |
| // Transition 2 to observer |
| cluster.transitionToObserver(2); |
| doEdits(0, 10); |
| // After a rollEditLog, Standby(nn1)'s next checkpoint would be |
| // ahead of observer(nn2). |
| nns[0].getRpcServer().rollEditLog(); |
| |
| NameNode nn2 = nns[2]; |
| FSImage nnFSImage = NameNodeAdapter.getAndSetFSImageInHttpServer(nn2, null); |
| |
| // After standby creating a checkpoint, it will try to push the image to |
| // active and all observer, updating it's own txid to the most recent. |
| HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); |
| HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); |
| |
| NameNodeAdapter.getAndSetFSImageInHttpServer(nn2, nnFSImage); |
| cluster.transitionToStandby(2); |
| logger.removeAppender(appender); |
| |
| for (LoggingEvent event : appender.getLog()) { |
| String message = event.getRenderedMessage(); |
| if (message.contains("PutImage failed") && |
| message.contains("FSImage has not been set in the NameNode.")) { |
| //Logs have the expected exception. |
| return; |
| } |
| } |
| fail("Expected exception not present in logs."); |
| } |
| |
| /** |
| * Test for the case when the SBN is configured to checkpoint based |
| * on a time period, but no transactions are happening on the |
| * active. Thus, it would want to save a second checkpoint at the |
| * same txid, which is a no-op. This test makes sure this doesn't |
| * cause any problem. |
| */ |
| @Test(timeout = 300000) |
| public void testCheckpointWhenNoNewTransactionsHappened() |
| throws Exception { |
| // Checkpoint as fast as we can, in a tight loop. |
| cluster.getConfiguration(1).setInt( |
| DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); |
| cluster.restartNameNode(1); |
| nns[1] = cluster.getNameNode(1); |
| |
| FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]); |
| |
| // We shouldn't save any checkpoints at txid=0 |
| Thread.sleep(1000); |
| Mockito.verify(spyImage1, Mockito.never()) |
| .saveNamespace(any()); |
| |
| // Roll the primary and wait for the standby to catch up |
| HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]); |
| Thread.sleep(2000); |
| |
| // We should make exactly one checkpoint at this new txid. |
| Mockito.verify(spyImage1, Mockito.times(1)).saveNamespace( |
| any(), Mockito.eq(NameNodeFile.IMAGE), any()); |
| } |
| |
| /** |
| * Test cancellation of ongoing checkpoints when failover happens |
| * mid-checkpoint. |
| */ |
| @Test(timeout=120000) |
| public void testCheckpointCancellation() throws Exception { |
| cluster.transitionToStandby(0); |
| |
| // Create an edit log in the shared edits dir with a lot |
| // of mkdirs operations. This is solely so that the image is |
| // large enough to take a non-trivial amount of time to load. |
| // (only ~15MB) |
| URI sharedUri = cluster.getSharedEditsDir(0, 1); |
| File sharedDir = new File(sharedUri.getPath(), "current"); |
| File tmpDir = new File(MiniDFSCluster.getBaseDirectory(), |
| "testCheckpointCancellation-tmp"); |
| FSNamesystem fsn = cluster.getNamesystem(0); |
| FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, NUM_DIRS_IN_LOG, 3, |
| fsn.getFSDirectory().getLastInodeId() + 1); |
| String fname = NNStorage.getInProgressEditsFileName(3); |
| new File(tmpDir, fname).renameTo(new File(sharedDir, fname)); |
| |
| // Checkpoint as fast as we can, in a tight loop. |
| cluster.getConfiguration(1).setInt( |
| DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0); |
| cluster.restartNameNode(1); |
| nns[1] = cluster.getNameNode(1); |
| |
| cluster.transitionToActive(0); |
| |
| boolean canceledOne = false; |
| for (int i = 0; i < 10 && !canceledOne; i++) { |
| |
| doEdits(i*10, i*10 + 10); |
| cluster.transitionToStandby(0); |
| cluster.transitionToActive(1); |
| cluster.transitionToStandby(1); |
| cluster.transitionToActive(0); |
| canceledOne = StandbyCheckpointer.getCanceledCount() > 0; |
| } |
| |
| assertTrue(canceledOne); |
| } |
| |
| /** |
| * Test cancellation of ongoing checkpoints when failover happens |
| * mid-checkpoint during image upload from standby to active NN. |
| */ |
| @Test(timeout=60000) |
| public void testCheckpointCancellationDuringUpload() throws Exception { |
| // Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it |
| // doing checkpoint when it becomes a standby |
| cluster.getConfiguration(0).setInt( |
| DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000); |
| |
| // don't compress, we want a big image |
| for (int i = 0; i < NUM_NNS; i++) { |
| cluster.getConfiguration(i).setBoolean( |
| DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false); |
| } |
| |
| // Throttle SBN upload to make it hang during upload to ANN |
| for (int i = 1; i < NUM_NNS; i++) { |
| cluster.getConfiguration(i).setLong( |
| DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100); |
| } |
| for (int i = 0; i < NUM_NNS; i++) { |
| cluster.restartNameNode(i); |
| } |
| |
| // update references to each of the nns |
| setNNs(); |
| |
| cluster.transitionToActive(0); |
| |
| doEdits(0, 100); |
| |
| for (int i = 1; i < NUM_NNS; i++) { |
| HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]); |
| HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104)); |
| } |
| |
| cluster.transitionToStandby(0); |
| cluster.transitionToActive(1); |
| |
| |
| // Wait to make sure background TransferFsImageUpload thread was cancelled. |
| // This needs to be done before the next test in the suite starts, so that a |
| // file descriptor is not held open during the next cluster init. |
| cluster.shutdown(); |
| cluster = null; |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); |
| ThreadInfo[] threads = threadBean.getThreadInfo( |
| threadBean.getAllThreadIds(), 1); |
| for (ThreadInfo thread: threads) { |
| if (thread.getThreadName().startsWith("TransferFsImageUpload")) { |
| return false; |
| } |
| } |
| return true; |
| } |
| }, 1000, 30000); |
| |
| // Assert that former active did not accept the canceled checkpoint file. |
| assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId()); |
| } |
| |
| /** |
| * Make sure that clients will receive StandbyExceptions even when a |
| * checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer |
| * thread will have FSNS lock. Regression test for HDFS-4591. |
| */ |
| @Test(timeout=300000) |
| public void testStandbyExceptionThrownDuringCheckpoint() throws Exception { |
| |
| // Set it up so that we know when the SBN checkpoint starts and ends. |
| FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]); |
| DelayAnswer answerer = new DelayAnswer(LOG); |
| Mockito.doAnswer(answerer).when(spyImage1) |
| .saveNamespace(any(FSNamesystem.class), |
| Mockito.eq(NameNodeFile.IMAGE), any(Canceler.class)); |
| |
| // Perform some edits and wait for a checkpoint to start on the SBN. |
| doEdits(0, 1000); |
| nns[0].getRpcServer().rollEditLog(); |
| answerer.waitForCall(); |
| assertTrue("SBN is not performing checkpoint but it should be.", |
| answerer.getFireCount() == 1 && answerer.getResultCount() == 0); |
| |
| // Make sure that the lock has actually been taken by the checkpointing |
| // thread. |
| ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); |
| try { |
| // Perform an RPC to the SBN and make sure it throws a StandbyException. |
| nns[1].getRpcServer().getFileInfo("/"); |
| fail("Should have thrown StandbyException, but instead succeeded."); |
| } catch (StandbyException se) { |
| GenericTestUtils.assertExceptionContains("is not supported", se); |
| } |
| |
| // Make sure new incremental block reports are processed during |
| // checkpointing on the SBN. |
| assertEquals(0, cluster.getNamesystem(1).getPendingDataNodeMessageCount()); |
| doCreate(); |
| Thread.sleep(1000); |
| assertTrue(cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0); |
| |
| // Make sure that the checkpoint is still going on, implying that the client |
| // RPC to the SBN happened during the checkpoint. |
| assertTrue("SBN should have still been checkpointing.", |
| answerer.getFireCount() == 1 && answerer.getResultCount() == 0); |
| answerer.proceed(); |
| answerer.waitForResult(); |
| assertTrue("SBN should have finished checkpointing.", |
| answerer.getFireCount() == 1 && answerer.getResultCount() == 1); |
| } |
| |
| @Test(timeout=300000) |
| public void testReadsAllowedDuringCheckpoint() throws Exception { |
| |
| // Set it up so that we know when the SBN checkpoint starts and ends. |
| FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]); |
| DelayAnswer answerer = new DelayAnswer(LOG); |
| Mockito.doAnswer(answerer).when(spyImage1) |
| .saveNamespace(any(FSNamesystem.class), |
| any(NameNodeFile.class), |
| any(Canceler.class)); |
| |
| // Perform some edits and wait for a checkpoint to start on the SBN. |
| doEdits(0, 1000); |
| nns[0].getRpcServer().rollEditLog(); |
| answerer.waitForCall(); |
| assertTrue("SBN is not performing checkpoint but it should be.", |
| answerer.getFireCount() == 1 && answerer.getResultCount() == 0); |
| |
| // Make sure that the lock has actually been taken by the checkpointing |
| // thread. |
| ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); |
| |
| // Perform an RPC that needs to take the write lock. |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| nns[1].getRpcServer().restoreFailedStorage("false"); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| } |
| }; |
| t.start(); |
| |
| // Make sure that our thread is waiting for the lock. |
| ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); |
| |
| assertFalse(nns[1].getNamesystem().getFsLockForTests().hasQueuedThreads()); |
| assertFalse(nns[1].getNamesystem().getFsLockForTests().isWriteLocked()); |
| assertTrue(nns[1].getNamesystem().getCpLockForTests().hasQueuedThreads()); |
| |
| // Get /jmx of the standby NN web UI, which will cause the FSNS read lock to |
| // be taken. |
| String pageContents = DFSTestUtil.urlGet(new URL("http://" + |
| nns[1].getHttpAddress().getHostName() + ":" + |
| nns[1].getHttpAddress().getPort() + "/jmx")); |
| assertTrue(pageContents.contains("NumLiveDataNodes")); |
| |
| // Make sure that the checkpoint is still going on, implying that the client |
| // RPC to the SBN happened during the checkpoint. |
| assertTrue("SBN should have still been checkpointing.", |
| answerer.getFireCount() == 1 && answerer.getResultCount() == 0); |
| answerer.proceed(); |
| answerer.waitForResult(); |
| assertTrue("SBN should have finished checkpointing.", |
| answerer.getFireCount() == 1 && answerer.getResultCount() == 1); |
| |
| t.join(); |
| } |
| |
| /** |
| * Test for the case standby NNs can upload FSImage to ANN after |
| * become non-primary standby NN. HDFS-9787 |
| */ |
| @Test(timeout=300000) |
| public void testNonPrimarySBNUploadFSImage() throws Exception { |
| // Shutdown all standby NNs. |
| for (int i = 1; i < NUM_NNS; i++) { |
| cluster.shutdownNameNode(i); |
| |
| // Checkpoint as fast as we can, in a tight loop. |
| cluster.getConfiguration(i).setInt( |
| DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1); |
| } |
| |
| doEdits(0, 10); |
| cluster.transitionToStandby(0); |
| |
| // Standby NNs do checkpoint without active NN available. |
| for (int i = 1; i < NUM_NNS; i++) { |
| cluster.restartNameNode(i, false); |
| } |
| cluster.waitClusterUp(); |
| |
| for (int i = 0; i < NUM_NNS; i++) { |
| // Once the standby catches up, it should do a checkpoint |
| // and save to local directories. |
| HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(12)); |
| } |
| |
| cluster.transitionToActive(0); |
| |
| // Wait for 2 seconds to expire last upload time. |
| Thread.sleep(2000); |
| |
| doEdits(11, 20); |
| nns[0].getRpcServer().rollEditLog(); |
| |
| // One of standby NNs should also upload it back to the active. |
| HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(23)); |
| } |
| |
| /** |
| * Test that checkpointing is still successful even if an issue |
| * was encountered while writing the legacy OIV image. |
| */ |
| @Test(timeout=300000) |
| public void testCheckpointSucceedsWithLegacyOIVException() throws Exception { |
| // Delete the OIV image dir to cause an IOException while saving |
| FileUtil.fullyDelete(tmpOivImgDir); |
| |
| doEdits(0, 10); |
| HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]); |
| // Once the standby catches up, it should notice that it needs to |
| // do a checkpoint and save one to its local directories. |
| HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); |
| |
| // It should also upload it back to the active. |
| HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); |
| } |
| |
| private void doEdits(int start, int stop) throws IOException { |
| for (int i = start; i < stop; i++) { |
| Path p = new Path("/test" + i); |
| fs.mkdirs(p); |
| } |
| } |
| |
| private void doCreate() throws IOException { |
| Path p = new Path("/testFile"); |
| fs.delete(p, false); |
| FSDataOutputStream out = fs.create(p, (short)1); |
| out.write(42); |
| out.close(); |
| } |
| |
| |
| /** |
| * A codec which just slows down the saving of the image significantly |
| * by sleeping a few milliseconds on every write. This makes it easy to |
| * catch the standby in the middle of saving a checkpoint. |
| */ |
| public static class SlowCodec extends GzipCodec { |
| @Override |
| public CompressionOutputStream createOutputStream(OutputStream out) |
| throws IOException { |
| CompressionOutputStream ret = super.createOutputStream(out); |
| CompressionOutputStream spy = Mockito.spy(ret); |
| Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5)) |
| .when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt()); |
| return spy; |
| } |
| } |
| |
| } |