blob: e2513244571b502df088c35a0b722e2c28dc92b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
import org.apache.hadoop.hdds.scm.ha.SCMStateMachine;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.ExitManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.server.protocol.TermIndex;
import static org.junit.Assert.assertTrue;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Disabled;
import org.slf4j.Logger;
import org.slf4j.event.Level;
/**
* Tests the Ratis snapshot feature in SCM.
*/
@Timeout(500)
public class TestSCMInstallSnapshotWithHA {
private MiniOzoneHAClusterImpl cluster = null;
private OzoneConfiguration conf;
private String clusterId;
private String scmId;
private String omServiceId;
private String scmServiceId;
private int numOfOMs = 1;
private int numOfSCMs = 3;
private static final long SNAPSHOT_THRESHOLD = 5;
// private static final int LOG_PURGE_GAP = 5;
/**
* Create a MiniOzoneCluster for testing.
*
* @throws IOException
*/
@BeforeEach
public void init() throws Exception {
conf = new OzoneConfiguration();
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omServiceId = "om-service-test1";
scmServiceId = "scm-service-test1";
SCMHAConfiguration scmhaConfiguration =
conf.getObject(SCMHAConfiguration.class);
// scmhaConfiguration.setRaftLogPurgeEnabled(true);
// scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
scmhaConfiguration.setRatisSnapshotThreshold(SNAPSHOT_THRESHOLD);
conf.setFromObject(scmhaConfiguration);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
.setSCMServiceId(scmServiceId)
.setNumOfOzoneManagers(numOfOMs)
.setNumOfStorageContainerManagers(numOfSCMs)
.setNumOfActiveSCMs(2)
.build();
cluster.waitForClusterToBeReady();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterEach
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* This test is disabled for now as there seems to be an issue with
* Ratis install Snapshot code. In ratis while a new node gets added,
* unless and until the node gets added to the voter list, the follower state
* is not updated with leader info. So, while an install snapshot notification
* is received in the leader, the leader info is not set and hence, out of
* ratis transfer using the same leader info doesn't work.
*
* TODO: Fix this
* */
@Test
@Disabled
public void testInstallSnapshot() throws Exception {
// Get the leader SCM
StorageContainerManager leaderSCM = getLeader(cluster);
String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId();
Assert.assertNotNull(leaderSCM);
// Find the inactive SCM
String followerId = getInactiveSCM(cluster).getScmId();
StorageContainerManager follower = cluster.getSCM(followerId);
// Do some transactions so that the log index increases
List<ContainerInfo> containers = writeToIncreaseLogIndex(leaderSCM, 200);
// Get the latest db checkpoint from the leader SCM.
TransactionInfo transactionInfo =
leaderSCM.getScmHAManager().asSCMHADBTransactionBuffer()
.getLatestTrxInfo();
TermIndex leaderTermIndex =
TermIndex.valueOf(transactionInfo.getTerm(),
transactionInfo.getTransactionIndex());
long leaderSnaphsotIndex = leaderTermIndex.getIndex();
long leaderSnapshotTermIndex = leaderTermIndex.getTerm();
DBCheckpoint leaderDbCheckpoint =
leaderSCM.getScmMetadataStore().getStore().getCheckpoint(false);
// Start the inactive
cluster.startInactiveSCM(followerId);
// The recently started should be lagging behind the leader .
long followerLastAppliedIndex =
follower.getScmHAManager().getRatisServer().getSCMStateMachine()
.getLastAppliedTermIndex().getIndex();
assertTrue(
followerLastAppliedIndex < leaderSnaphsotIndex);
SCMHAManagerImpl scmhaManager =
(SCMHAManagerImpl) (follower.getScmHAManager());
// Install leader 's db checkpoint on the lagging .
scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint);
SCMStateMachine followerStateMachine =
follower.getScmHAManager().getRatisServer().getSCMStateMachine();
// After the new checkpoint is installed, the follower
// lastAppliedIndex must >= the snapshot index of the checkpoint. It
// could be great than snapshot index if there is any conf entry from ratis.
followerLastAppliedIndex = followerStateMachine
.getLastAppliedTermIndex().getIndex();
assertTrue(followerLastAppliedIndex >= leaderSnaphsotIndex);
assertTrue(followerStateMachine
.getLastAppliedTermIndex().getTerm() >= leaderSnapshotTermIndex);
// Verify that the follower 's DB contains the transactions which were
// made while it was inactive.
SCMMetadataStore followerMetaStore = follower.getScmMetadataStore();
for (ContainerInfo containerInfo : containers) {
Assert.assertNotNull(followerMetaStore.getContainerTable()
.get(containerInfo.containerID()));
}
}
@Test
public void testInstallOldCheckpointFailure() throws Exception {
// Get the leader SCM
StorageContainerManager leaderSCM = getLeader(cluster);
String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId();
String followerId = getInactiveSCM(cluster).getScmId();
// Find the inactive SCM
StorageContainerManager follower = cluster.getSCM(followerId);
cluster.startInactiveSCM(followerId);
follower.exitSafeMode();
DBCheckpoint leaderDbCheckpoint = leaderSCM.getScmMetadataStore().getStore()
.getCheckpoint(false);
SCMStateMachine leaderSM =
leaderSCM.getScmHAManager().getRatisServer().getSCMStateMachine();
TermIndex lastTermIndex = leaderSM.getLastAppliedTermIndex();
SCMStateMachine followerSM =
follower.getScmHAManager().getRatisServer().getSCMStateMachine();
follower.getScmMetadataStore().getTransactionInfoTable().
put(OzoneConsts.TRANSACTION_INFO_KEY, TransactionInfo.builder()
.setCurrentTerm(lastTermIndex.getTerm())
.setTransactionIndex(lastTermIndex.getIndex() + 100).build());
// Advance the follower
followerSM.notifyTermIndexUpdated(lastTermIndex.getTerm(),
lastTermIndex.getIndex() + 100);
GenericTestUtils.setLogLevel(SCMHAManagerImpl.getLogger(), Level.INFO);
GenericTestUtils.LogCapturer logCapture =
GenericTestUtils.LogCapturer.captureLogs(SCMHAManagerImpl.getLogger());
// Install the old checkpoint on the follower . This should fail as the
// follower is already ahead of that transactionLogIndex and the
// state should be reloaded.
TermIndex followerTermIndex = followerSM.getLastAppliedTermIndex();
SCMHAManagerImpl scmhaManager =
(SCMHAManagerImpl) (follower.getScmHAManager());
TermIndex newTermIndex =
scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint);
String errorMsg = "Reloading old state of SCM";
Assert.assertTrue(logCapture.getOutput().contains(errorMsg));
Assert.assertNull(" installed checkpoint even though checkpoint " +
"logIndex is less than it's lastAppliedIndex", newTermIndex);
Assert.assertEquals(followerTermIndex,
followerSM.getLastAppliedTermIndex());
Assert.assertFalse(followerSM.getLifeCycleState().isPausingOrPaused());
}
@Test
public void testInstallCorruptedCheckpointFailure() throws Exception {
StorageContainerManager leaderSCM = getLeader(cluster);
String leaderNodeId = leaderSCM.getScmId();
// Find the inactive SCM
String followerId = getInactiveSCM(cluster).getScmId();
StorageContainerManager follower = cluster.getSCM(followerId);
// Do some transactions so that the log index increases
writeToIncreaseLogIndex(leaderSCM, 100);
File oldDBLocation =
follower.getScmMetadataStore().getStore().getDbLocation();
SCMStateMachine sm =
follower.getScmHAManager().getRatisServer().getSCMStateMachine();
TermIndex termIndex = sm.getLastAppliedTermIndex();
DBCheckpoint leaderDbCheckpoint = leaderSCM.getScmMetadataStore().getStore()
.getCheckpoint(false);
Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation();
TransactionInfo leaderCheckpointTrxnInfo = HAUtils
.getTrxnInfoFromCheckpoint(conf, leaderCheckpointLocation,
new SCMDBDefinition());
Assert.assertNotNull(leaderCheckpointLocation);
// Take a backup of the current DB
String dbBackupName =
"SCM_CHECKPOINT_BACKUP" + termIndex.getIndex() + "_" + System
.currentTimeMillis();
File dbDir = oldDBLocation.getParentFile();
File checkpointBackup = new File(dbDir, dbBackupName);
// Take a backup of the leader checkpoint
Files.copy(leaderCheckpointLocation.toAbsolutePath(),
checkpointBackup.toPath());
// Corrupt the leader checkpoint and install that on the follower. The
// operation should fail and should shutdown.
boolean delete = true;
for (File file : leaderCheckpointLocation.toFile()
.listFiles()) {
if (file.getName().contains(".sst")) {
if (delete) {
file.delete();
delete = false;
} else {
delete = true;
}
}
}
SCMHAManagerImpl scmhaManager =
(SCMHAManagerImpl) (follower.getScmHAManager());
GenericTestUtils.setLogLevel(SCMHAManagerImpl.getLogger(), Level.ERROR);
GenericTestUtils.LogCapturer logCapture =
GenericTestUtils.LogCapturer.captureLogs(SCMHAManagerImpl.getLogger());
scmhaManager.setExitManagerForTesting(new DummyExitManager());
scmhaManager.installCheckpoint(leaderNodeId, leaderCheckpointLocation,
leaderCheckpointTrxnInfo);
Assert.assertTrue(logCapture.getOutput()
.contains("Failed to reload SCM state and instantiate services."));
Assert.assertTrue(sm.getLifeCycleState().isPausingOrPaused());
// Verify correct reloading
HAUtils
.replaceDBWithCheckpoint(leaderCheckpointTrxnInfo.getTransactionIndex(),
oldDBLocation, checkpointBackup.toPath(),
OzoneConsts.SCM_DB_BACKUP_PREFIX);
scmhaManager.startServices();
sm.unpause(leaderCheckpointTrxnInfo.getTerm(),
leaderCheckpointTrxnInfo.getTransactionIndex());
Assert.assertTrue(sm.getLastAppliedTermIndex()
.equals(leaderCheckpointTrxnInfo.getTermIndex()));
}
private List<ContainerInfo> writeToIncreaseLogIndex(
StorageContainerManager scm, long targetLogIndex)
throws IOException, InterruptedException {
List<ContainerInfo> containers = new ArrayList<>();
SCMStateMachine stateMachine =
scm.getScmHAManager().getRatisServer().getSCMStateMachine();
long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine()
.getLastAppliedTermIndex().getIndex();
while (logIndex < targetLogIndex) {
containers.add(scm.getContainerManager()
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
TestSCMInstallSnapshotWithHA.class.getName()));
Thread.sleep(100);
logIndex = stateMachine.getLastAppliedTermIndex().getIndex();
}
return containers;
}
private static class DummyExitManager extends ExitManager {
@Override
public void exitSystem(int status, String message, Throwable throwable,
Logger log) {
log.error("System Exit: " + message, throwable);
}
}
static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
if (scm.checkLeader()) {
return scm;
}
}
return null;
}
static StorageContainerManager getInactiveSCM(MiniOzoneHAClusterImpl impl) {
for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
if (!impl.isSCMActive(scm.getScmId())) {
return scm;
}
}
return null;
}
}