blob: 513f60cb1eded674944df416b4e6629eaefe24e1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.BindException;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.*;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ThreadUtil;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
public class TestStandbyCheckpoints {
private static final int NUM_DIRS_IN_LOG = 200000;
protected static int NUM_NNS = 3;
protected MiniDFSCluster cluster;
protected NameNode[] nns = new NameNode[NUM_NNS];
protected FileSystem fs;
private final Random random = new Random();
protected File tmpOivImgDir;
private static final Logger LOG = LoggerFactory.getLogger(TestStandbyCheckpoints.class);
@SuppressWarnings("rawtypes")
@Before
public void setupCluster() throws Exception {
Configuration conf = setupCommonConfig();
// Dial down the retention of extra edits and checkpoints. This is to
// help catch regressions of HDFS-4238 (SBN should not purge shared edits)
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
int retryCount = 0;
while (true) {
try {
int basePort = 10060 + random.nextInt(100) * 2;
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1))
.addNN(new MiniDFSNNTopology.NNConf("nn3").setHttpPort(basePort + 2)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(1)
.build();
cluster.waitActive();
setNNs();
fs = HATestUtil.configureFailoverFs(cluster, conf);
cluster.transitionToActive(0);
++retryCount;
break;
} catch (BindException e) {
LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry "
+ retryCount + " times");
}
}
}
protected void setNNs(){
for (int i = 0; i < NUM_NNS; i++) {
nns[i] = cluster.getNameNode(i);
}
}
protected Configuration setupCommonConfig() {
tmpOivImgDir = GenericTestUtils.getTestDir("TestStandbyCheckpoints");
tmpOivImgDir.mkdirs();
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY,
tmpOivImgDir.getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class>of(SlowCodec.class));
return conf;
}
@After
public void shutdownCluster() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
if (tmpOivImgDir != null) {
FileUtil.fullyDelete(tmpOivImgDir);
}
}
@Test(timeout = 300000)
public void testSBNCheckpoints() throws Exception {
JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nns[1]);
doEdits(0, 10);
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
// Once the standby catches up, it should notice that it needs to
// do a checkpoint and save one to its local directories.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (tmpOivImgDir.list().length > 0) {
return true;
} else {
return false;
}
}
}, 1000, 60000);
// It should have saved the oiv image too.
assertEquals("One file is expected", 1, tmpOivImgDir.list().length);
// It should also upload it back to the active.
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
// The standby should never try to purge edit logs on shared storage.
Mockito.verify(standbyJournalSet, Mockito.never()).
purgeLogsOlderThan(Mockito.anyLong());
}
@Test
public void testNewDirInitAfterCheckpointing() throws Exception {
File hdfsDir = new File(PathUtils.getTestDir(TestStandbyCheckpoints.class),
"testNewDirInitAfterCheckpointing");
File nameDir = new File(hdfsDir, "name1");
assert nameDir.mkdirs();
// Restart nn0 with an additional name dir.
String existingDir = cluster.getConfiguration(0).
get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
cluster.getConfiguration(0).set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
existingDir + "," + Util.fileAsURI(nameDir).toString());
cluster.restartNameNode(0);
nns[0] = cluster.getNameNode(0);
cluster.transitionToActive(0);
// "current" is created, but current/VERSION isn't.
File currDir = new File(nameDir, "current");
File versionFile = new File(currDir, "VERSION");
assert currDir.exists();
assert !versionFile.exists();
// Trigger a checkpointing and upload.
doEdits(0, 10);
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
// The version file will be created if a checkpoint is uploaded.
// Wait for it to happen up to 10 seconds.
for (int i = 0; i < 20; i++) {
if (versionFile.exists()) {
break;
}
Thread.sleep(500);
}
// VERSION must have been created.
assert versionFile.exists();
}
/**
* Test for the case when both of the NNs in the cluster are
* in the standby state, and thus are both creating checkpoints
* and uploading them to each other.
* In this circumstance, they should receive the error from the
* other node indicating that the other node already has a
* checkpoint for the given txid, but this should not cause
* an abort, etc.
*/
@Test(timeout = 300000)
public void testBothNodesInStandbyState() throws Exception {
doEdits(0, 10);
cluster.transitionToStandby(0);
// Transitioning to standby closed the edit log on the active,
// so the standby will catch up. Then, both will be in standby mode
// with enough uncheckpointed txns to cause a checkpoint, and they
// will each try to take a checkpoint and upload to each other.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
assertEquals(12, nns[0].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
assertEquals(12, nns[1].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
List<File> dirs = Lists.newArrayList();
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of());
}
/**
* Test for the case of when there are observer NameNodes, Standby node is
* able to upload fsImage to Observer node as well.
*/
@Test(timeout = 300000)
public void testStandbyAndObserverState() throws Exception {
// Transition 2 to observer
cluster.transitionToObserver(2);
doEdits(0, 10);
// After a rollEditLog, Standby(nn1) 's next checkpoint would be
// ahead of observer(nn2).
nns[0].getRpcServer().rollEditLog();
// After standby creating a checkpoint, it will try to push the image to
// active and all observer, updating it's own txid to the most recent.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12));
assertEquals(12, nns[2].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
assertEquals(12, nns[1].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
List<File> dirs = Lists.newArrayList();
// observer and standby both have this same image.
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2));
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of());
// Restore 2 back to standby
cluster.transitionToStandby(2);
}
/**
* Tests that a null FSImage is handled gracefully by the ImageServlet.
* If putImage is called while a NameNode is still starting up, the FSImage
* may not have been initialized yet. See HDFS-15290.
*/
@Test(timeout = 30000)
public void testCheckpointBeforeNameNodeInitializationIsComplete()
throws Exception {
final LogVerificationAppender appender = new LogVerificationAppender();
final org.apache.log4j.Logger logger = org.apache.log4j.Logger
.getRootLogger();
logger.addAppender(appender);
// Transition 2 to observer
cluster.transitionToObserver(2);
doEdits(0, 10);
// After a rollEditLog, Standby(nn1)'s next checkpoint would be
// ahead of observer(nn2).
nns[0].getRpcServer().rollEditLog();
NameNode nn2 = nns[2];
FSImage nnFSImage = NameNodeAdapter.getAndSetFSImageInHttpServer(nn2, null);
// After standby creating a checkpoint, it will try to push the image to
// active and all observer, updating it's own txid to the most recent.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
NameNodeAdapter.getAndSetFSImageInHttpServer(nn2, nnFSImage);
cluster.transitionToStandby(2);
logger.removeAppender(appender);
for (LoggingEvent event : appender.getLog()) {
String message = event.getRenderedMessage();
if (message.contains("PutImage failed") &&
message.contains("FSImage has not been set in the NameNode.")) {
//Logs have the expected exception.
return;
}
}
fail("Expected exception not present in logs.");
}
/**
* Test for the case when the SBN is configured to checkpoint based
* on a time period, but no transactions are happening on the
* active. Thus, it would want to save a second checkpoint at the
* same txid, which is a no-op. This test makes sure this doesn't
* cause any problem.
*/
@Test(timeout = 300000)
public void testCheckpointWhenNoNewTransactionsHappened()
throws Exception {
// Checkpoint as fast as we can, in a tight loop.
cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1);
nns[1] = cluster.getNameNode(1);
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
// We shouldn't save any checkpoints at txid=0
Thread.sleep(1000);
Mockito.verify(spyImage1, Mockito.never())
.saveNamespace(any());
// Roll the primary and wait for the standby to catch up
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
Thread.sleep(2000);
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1)).saveNamespace(
any(), Mockito.eq(NameNodeFile.IMAGE), any());
}
/**
* Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint.
*/
@Test(timeout=120000)
public void testCheckpointCancellation() throws Exception {
cluster.transitionToStandby(0);
// Create an edit log in the shared edits dir with a lot
// of mkdirs operations. This is solely so that the image is
// large enough to take a non-trivial amount of time to load.
// (only ~15MB)
URI sharedUri = cluster.getSharedEditsDir(0, 1);
File sharedDir = new File(sharedUri.getPath(), "current");
File tmpDir = new File(MiniDFSCluster.getBaseDirectory(),
"testCheckpointCancellation-tmp");
FSNamesystem fsn = cluster.getNamesystem(0);
FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, NUM_DIRS_IN_LOG, 3,
fsn.getFSDirectory().getLastInodeId() + 1);
String fname = NNStorage.getInProgressEditsFileName(3);
new File(tmpDir, fname).renameTo(new File(sharedDir, fname));
// Checkpoint as fast as we can, in a tight loop.
cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1);
nns[1] = cluster.getNameNode(1);
cluster.transitionToActive(0);
boolean canceledOne = false;
for (int i = 0; i < 10 && !canceledOne; i++) {
doEdits(i*10, i*10 + 10);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
}
assertTrue(canceledOne);
}
/**
* Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint during image upload from standby to active NN.
*/
@Test(timeout=60000)
public void testCheckpointCancellationDuringUpload() throws Exception {
// Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
// doing checkpoint when it becomes a standby
cluster.getConfiguration(0).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);
// don't compress, we want a big image
for (int i = 0; i < NUM_NNS; i++) {
cluster.getConfiguration(i).setBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
}
// Throttle SBN upload to make it hang during upload to ANN
for (int i = 1; i < NUM_NNS; i++) {
cluster.getConfiguration(i).setLong(
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
}
for (int i = 0; i < NUM_NNS; i++) {
cluster.restartNameNode(i);
}
// update references to each of the nns
setNNs();
cluster.transitionToActive(0);
doEdits(0, 100);
for (int i = 1; i < NUM_NNS; i++) {
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
}
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// Wait to make sure background TransferFsImageUpload thread was cancelled.
// This needs to be done before the next test in the suite starts, so that a
// file descriptor is not held open during the next cluster init.
cluster.shutdown();
cluster = null;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threads = threadBean.getThreadInfo(
threadBean.getAllThreadIds(), 1);
for (ThreadInfo thread: threads) {
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
return false;
}
}
return true;
}
}, 1000, 30000);
// Assert that former active did not accept the canceled checkpoint file.
assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
}
/**
* Make sure that clients will receive StandbyExceptions even when a
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
* thread will have FSNS lock. Regression test for HDFS-4591.
*/
@Test(timeout=300000)
public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
// Set it up so that we know when the SBN checkpoint starts and ends.
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
DelayAnswer answerer = new DelayAnswer(LOG);
Mockito.doAnswer(answerer).when(spyImage1)
.saveNamespace(any(FSNamesystem.class),
Mockito.eq(NameNodeFile.IMAGE), any(Canceler.class));
// Perform some edits and wait for a checkpoint to start on the SBN.
doEdits(0, 1000);
nns[0].getRpcServer().rollEditLog();
answerer.waitForCall();
assertTrue("SBN is not performing checkpoint but it should be.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
// Make sure that the lock has actually been taken by the checkpointing
// thread.
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
try {
// Perform an RPC to the SBN and make sure it throws a StandbyException.
nns[1].getRpcServer().getFileInfo("/");
fail("Should have thrown StandbyException, but instead succeeded.");
} catch (StandbyException se) {
GenericTestUtils.assertExceptionContains("is not supported", se);
}
// Make sure new incremental block reports are processed during
// checkpointing on the SBN.
assertEquals(0, cluster.getNamesystem(1).getPendingDataNodeMessageCount());
doCreate();
Thread.sleep(1000);
assertTrue(cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0);
// Make sure that the checkpoint is still going on, implying that the client
// RPC to the SBN happened during the checkpoint.
assertTrue("SBN should have still been checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
answerer.proceed();
answerer.waitForResult();
assertTrue("SBN should have finished checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
}
@Test(timeout=300000)
public void testReadsAllowedDuringCheckpoint() throws Exception {
// Set it up so that we know when the SBN checkpoint starts and ends.
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
DelayAnswer answerer = new DelayAnswer(LOG);
Mockito.doAnswer(answerer).when(spyImage1)
.saveNamespace(any(FSNamesystem.class),
any(NameNodeFile.class),
any(Canceler.class));
// Perform some edits and wait for a checkpoint to start on the SBN.
doEdits(0, 1000);
nns[0].getRpcServer().rollEditLog();
answerer.waitForCall();
assertTrue("SBN is not performing checkpoint but it should be.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
// Make sure that the lock has actually been taken by the checkpointing
// thread.
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
// Perform an RPC that needs to take the write lock.
Thread t = new Thread() {
@Override
public void run() {
try {
nns[1].getRpcServer().restoreFailedStorage("false");
} catch (IOException e) {
e.printStackTrace();
}
}
};
t.start();
// Make sure that our thread is waiting for the lock.
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
assertFalse(nns[1].getNamesystem().getFsLockForTests().hasQueuedThreads());
assertFalse(nns[1].getNamesystem().getFsLockForTests().isWriteLocked());
assertTrue(nns[1].getNamesystem().getCpLockForTests().hasQueuedThreads());
// Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
// be taken.
String pageContents = DFSTestUtil.urlGet(new URL("http://" +
nns[1].getHttpAddress().getHostName() + ":" +
nns[1].getHttpAddress().getPort() + "/jmx"));
assertTrue(pageContents.contains("NumLiveDataNodes"));
// Make sure that the checkpoint is still going on, implying that the client
// RPC to the SBN happened during the checkpoint.
assertTrue("SBN should have still been checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
answerer.proceed();
answerer.waitForResult();
assertTrue("SBN should have finished checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
t.join();
}
/**
* Test for the case standby NNs can upload FSImage to ANN after
* become non-primary standby NN. HDFS-9787
*/
@Test(timeout=300000)
public void testNonPrimarySBNUploadFSImage() throws Exception {
// Shutdown all standby NNs.
for (int i = 1; i < NUM_NNS; i++) {
cluster.shutdownNameNode(i);
// Checkpoint as fast as we can, in a tight loop.
cluster.getConfiguration(i).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
}
doEdits(0, 10);
cluster.transitionToStandby(0);
// Standby NNs do checkpoint without active NN available.
for (int i = 1; i < NUM_NNS; i++) {
cluster.restartNameNode(i, false);
}
cluster.waitClusterUp();
for (int i = 0; i < NUM_NNS; i++) {
// Once the standby catches up, it should do a checkpoint
// and save to local directories.
HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(12));
}
cluster.transitionToActive(0);
// Wait for 2 seconds to expire last upload time.
Thread.sleep(2000);
doEdits(11, 20);
nns[0].getRpcServer().rollEditLog();
// One of standby NNs should also upload it back to the active.
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(23));
}
/**
* Test that checkpointing is still successful even if an issue
* was encountered while writing the legacy OIV image.
*/
@Test(timeout=300000)
public void testCheckpointSucceedsWithLegacyOIVException() throws Exception {
// Delete the OIV image dir to cause an IOException while saving
FileUtil.fullyDelete(tmpOivImgDir);
doEdits(0, 10);
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
// Once the standby catches up, it should notice that it needs to
// do a checkpoint and save one to its local directories.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
// It should also upload it back to the active.
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
}
private void doEdits(int start, int stop) throws IOException {
for (int i = start; i < stop; i++) {
Path p = new Path("/test" + i);
fs.mkdirs(p);
}
}
private void doCreate() throws IOException {
Path p = new Path("/testFile");
fs.delete(p, false);
FSDataOutputStream out = fs.create(p, (short)1);
out.write(42);
out.close();
}
/**
* A codec which just slows down the saving of the image significantly
* by sleeping a few milliseconds on every write. This makes it easy to
* catch the standby in the middle of saving a checkpoint.
*/
public static class SlowCodec extends GzipCodec {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy;
}
}
}