HDDS-4896. Need a tool to upgrade current non-HA SCM node to single node HA cluster (#1999)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index a46f396..c170b0b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -292,6 +292,20 @@
public static final String OZONE_SCM_NODE_ID_KEY =
"ozone.scm.node.id";
+ /**
+ * Optional config, if being set will cause scm --init to only take effect on
+ * the specific node and ignore scm --bootstrap cmd.
+ * Similarly, scm --init will be ignored on the non-primordial scm nodes.
+ * With the config set, applications/admins can safely execute init and
+ * bootstrap commands safely on all scm instances, for example kubernetes
+ * deployments.
+ *
+ * If a cluster is upgraded from non-ratis to ratis based SCM, scm --init
+ * needs to re-run for switching from
+ * non-ratis based SCM to ratis-based SCM on the primary node.
+ */
+ public static final String OZONE_SCM_PRIMORDIAL_NODE_ID_KEY =
+ "ozone.scm.primordial.node.id";
// The path where datanode ID is to be written to.
// if this value is not set then container startup will fail.
public static final String OZONE_SCM_DATANODE_ID_DIR =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index 192b784..b3fc40e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -56,6 +56,16 @@
ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
}
+ public static String getPrimordialSCM(ConfigurationSource conf) {
+ return conf.get(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY);
+ }
+
+ public static boolean isPrimordialSCM(ConfigurationSource conf,
+ String selfNodeId) {
+ String primordialNode = getPrimordialSCM(conf);
+ return isSCMHAEnabled(conf) && primordialNode != null && primordialNode
+ .equals(selfNodeId);
+ }
/**
* Get a collection of all scmNodeIds for the given scmServiceId.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a59a627..0a40d73 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1920,6 +1920,22 @@
</description>
</property>
<property>
+ <name>ozone.scm.primordial.node.id</name>
+ <value></value>
+ <tag>OZONE, SCM, HA</tag>
+ <description>
+ optional config, if being set will cause scm --init to only take effect on
+ the specific node and ignore scm --bootstrap cmd.
+ Similarly, scm --init will be ignored on the non-primordial scm nodes.
+ With the config set, applications/admins can safely execute init and
+ bootstrap commands safely on all scm instances.
+
+ If a cluster is upgraded from non-ratis to ratis based SCM, scm --init
+ needs to re-run for switching from
+ non-ratis based SCM to ratis-based SCM on the primary node.
+ </description>
+ </property>
+ <property>
<name>ozone.scm.ratis.enable</name>
<value>false</value>
<tag>OZONE, SCM, HA, RATIS</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
index 4ee2ae3..4eea7a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
@@ -138,6 +138,5 @@
@Override
public void close() throws IOException {
- flush();
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 3df5f7b..4835d03 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -230,7 +230,7 @@
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the SCM Double Buffer so that if there
// are any pending transactions in the buffer, they are discarded.
-
+ getRatisServer().getSCMStateMachine().pause();
} catch (Exception e) {
LOG.error("Failed to stop/ pause the services. Cannot proceed with "
+ "installing the new checkpoint.");
@@ -250,42 +250,41 @@
LOG.error("Failed to install Snapshot from {} as SCM failed to replace"
+ " DB with downloaded checkpoint. Reloading old SCM state.", e);
}
+ // Reload the DB store with the new checkpoint.
+ // Restart (unpause) the state machine and update its last applied index
+ // to the installed checkpoint's snapshot index.
+ try {
+ reloadSCMState();
+ getRatisServer().getSCMStateMachine().unpause(term, lastAppliedIndex);
+ LOG.info("Reloaded SCM state with Term: {} and Index: {}", term,
+ lastAppliedIndex);
+ } catch (Exception ex) {
+ String errorMsg =
+ "Failed to reload SCM state and instantiate services.";
+ exitManager.exitSystem(1, errorMsg, ex, LOG);
+ }
+
+ // Delete the backup DB
+ try {
+ if (dbBackup != null) {
+ FileUtils.deleteFully(dbBackup);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to delete the backup of the original DB {}",
+ dbBackup);
+ }
} else {
LOG.warn("Cannot proceed with InstallSnapshot as SCM is at TermIndex {} "
+ "and checkpoint has lower TermIndex {}. Reloading old "
+ "state of SCM.", termIndex, checkpointTrxnInfo.getTermIndex());
}
- // Reload the DB store with the new checkpoint.
- // Restart (unpause) the state machine and update its last applied index
- // to the installed checkpoint's snapshot index.
- try {
- reloadSCMState();
- getRatisServer().getSCMStateMachine().unpause(lastAppliedIndex, term);
- LOG.info("Reloaded SCM state with Term: {} and Index: {}", term,
- lastAppliedIndex);
- } catch (Exception ex) {
- String errorMsg = "Failed to reload SCM state and instantiate services.";
- exitManager.exitSystem(1, errorMsg, ex, LOG);
- }
-
- // Delete the backup DB
- try {
- if (dbBackup != null) {
- FileUtils.deleteFully(dbBackup);
- }
- } catch (Exception e) {
- LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
- }
-
if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) {
// Install Snapshot failed and old state was reloaded. Return null to
// Ratis to indicate that installation failed.
return null;
}
- // TODO: We should only return the snpashotIndex to the leader.
- // Should be fixed after RATIS-586
TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
return newTermIndex;
}
@@ -308,7 +307,7 @@
public void shutdown() throws IOException {
if (ratisServer != null) {
ratisServer.stop();
- ratisServer.getSCMStateMachine().stop();
+ ratisServer.getSCMStateMachine().close();
grpcServer.stop();
}
}
@@ -334,7 +333,8 @@
scm.getScmMetadataStore().stop();
}
- void startServices() throws IOException {
+ @VisibleForTesting
+ public void startServices() throws IOException {
// TODO: Fix the metrics ??
final SCMMetadataStore metadataStore = scm.getScmMetadataStore();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 3e6b57c..593e9c1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -17,18 +17,14 @@
package org.apache.hadoop.hdds.scm.ha;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import java.util.Optional;
+import java.util.Iterator;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -50,7 +46,6 @@
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,15 +90,48 @@
public static void initialize(String clusterId, String scmId,
SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
- RaftServer server = newRaftServer(scmId, conf)
- .setGroup(group).build();
- server.start();
- // TODO: Timeout and sleep interval should be made configurable
- waitforLeaderToBeReady(server, 60000, group);
- server.close();
+ RaftServer server = null;
+ try {
+ server = newRaftServer(scmId, conf).setGroup(group).build();
+ server.start();
+ // TODO: Timeout and sleep interval should be made configurable
+ waitForLeaderToBeReady(server, 60000, group);
+ } finally {
+ if (server != null) {
+ server.close();
+ }
+ }
}
- private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+ public static void reinitialize(String clusterId, String scmId,
+ SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+ RaftServer server = null;
+ try {
+ server = newRaftServer(scmId, conf).build();
+ RaftGroup group = null;
+ Iterator<RaftGroup> iter = server.getGroups().iterator();
+ if (iter.hasNext()) {
+ group = iter.next();
+ }
+ if (group != null && group.getGroupId()
+ .equals(buildRaftGroupId(clusterId))) {
+ LOG.info("Ratis group with group Id {} already exists.",
+ group.getGroupId());
+ return;
+ } else {
+ // close the server instance so that pending locks on raft storage
+ // directory gets released if any and further initiliaze can succeed.
+ server.close();
+ initialize(clusterId, scmId, details, conf);
+ }
+ } finally {
+ if (server != null) {
+ server.close();
+ }
+ }
+ }
+
+ private static void waitForLeaderToBeReady(RaftServer server, long timeout,
RaftGroup group)
throws IOException {
boolean ready;
@@ -138,6 +166,7 @@
@Override
public void start() throws IOException {
+ LOG.info("starting ratis server {}", server.getPeer().getAddress());
server.start();
}
@@ -187,6 +216,7 @@
@Override
public void stop() throws IOException {
+ LOG.info("stopping ratis server {}", server.getPeer().getAddress());
server.close();
}
@@ -241,46 +271,6 @@
}
}
- @VisibleForTesting
- public static void validateRatisGroupExists(OzoneConfiguration conf,
- String clusterId) throws IOException {
- final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
- final RaftProperties properties = RatisUtil.newRaftProperties(haConf, conf);
- final RaftGroupId raftGroupId = buildRaftGroupId(clusterId);
- final AtomicBoolean found = new AtomicBoolean(false);
- RaftServerConfigKeys.storageDir(properties).parallelStream().forEach(
- (dir) -> Optional.ofNullable(dir.listFiles()).map(Arrays::stream)
- .orElse(Stream.empty()).filter(File::isDirectory).forEach(sub -> {
- try {
- LOG.info("{}: found a subdirectory {}", raftGroupId, sub);
- RaftGroupId groupId = null;
- try {
- groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
- } catch (Exception e) {
- LOG.info("{}: The directory {} is not a group directory;"
- + " ignoring it. ", raftGroupId, sub.getAbsolutePath());
- }
- if (groupId != null) {
- if (groupId.equals(raftGroupId)) {
- LOG.info(
- "{} : The directory {} found a group directory for "
- + "cluster {}", raftGroupId, sub.getAbsolutePath(),
- clusterId);
- found.set(true);
- }
- }
- } catch (Exception e) {
- LOG.warn(
- raftGroupId + ": Failed to find the group directory "
- + sub.getAbsolutePath() + ".", e);
- }
- }));
- if (!found.get()) {
- throw new IOException(
- "Could not find any ratis group with id " + raftGroupId);
- }
- }
-
private static RaftGroup buildRaftGroup(SCMNodeDetails details,
String scmId, String clusterId) {
Preconditions.checkNotNull(scmId);
@@ -299,7 +289,8 @@
return group;
}
- private static RaftGroupId buildRaftGroupId(String clusterId) {
+ @VisibleForTesting
+ public static RaftGroupId buildRaftGroupId(String clusterId) {
Preconditions.checkNotNull(clusterId);
return RaftGroupId.valueOf(
UUID.fromString(clusterId.replace(OzoneConsts.CLUSTER_ID_PREFIX, "")));
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 65569b9..15332d7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -40,9 +40,12 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
@@ -111,6 +114,18 @@
transactionBuffer.getLatestSnapshot();
}
+ /**
+ * Initializes the State Machine with the given server, group and storage.
+ */
+ @Override
+ public void initialize(RaftServer server, RaftGroupId id,
+ RaftStorage raftStorage) throws IOException {
+ getLifeCycle().startAndTransition(() -> {
+ super.initialize(server, id, raftStorage);
+ storage.init(raftStorage);
+ });
+ }
+
@Override
public CompletableFuture<Message> applyTransaction(
final TransactionContext trx) {
@@ -246,7 +261,6 @@
LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
lastAppliedIndex, Time.monotonicNow() - startTime);
}
- super.takeSnapshot();
return lastAppliedIndex;
}
@@ -277,7 +291,10 @@
getLifeCycle().transition(LifeCycle.State.PAUSED);
}
- public void stop() {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ transactionBuffer.close();
HadoopExecutors.
shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
}
@@ -286,14 +303,13 @@
* lastAppliedIndex. This should be done after uploading new state to the
* StateMachine.
*/
- public void unpause(long newLastAppliedSnaphsotIndex,
- long newLastAppliedSnapShotTermIndex) {
+ public void unpause(long newLastAppliedSnapShotTerm,
+ long newLastAppliedSnapshotIndex) {
getLifeCycle().startAndTransition(() -> {
try {
transactionBuffer.init();
this.setLastAppliedTermIndex(TermIndex
- .valueOf(newLastAppliedSnapShotTermIndex,
- newLastAppliedSnaphsotIndex));
+ .valueOf(newLastAppliedSnapShotTerm, newLastAppliedSnapshotIndex));
} catch (IOException ioe) {
LOG.error("Failed to unpause ", ioe);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 80a7f3c..d7bb0bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -743,6 +743,15 @@
// The node here will try to fetch the cluster id from any of existing
// running SCM instances.
SCMHANodeDetails scmhaNodeDetails = SCMHANodeDetails.loadSCMHAConfig(conf);
+ String primordialSCM = SCMHAUtils.getPrimordialSCM(conf);
+ String selfNodeId = scmhaNodeDetails.getLocalNodeDetails().getNodeId();
+ if (primordialSCM != null && SCMHAUtils.isPrimordialSCM(conf, selfNodeId)) {
+ LOG.info(
+ "SCM bootstrap command can only be executed in non-Primordial SCM "
+ + "{}, self id {} "
+ + "Ignoring it.", primordialSCM, selfNodeId);
+ return false;
+ }
OzoneConfiguration config =
SCMHAUtils.removeSelfId(conf,
scmhaNodeDetails.getLocalNodeDetails().getNodeId());
@@ -791,6 +800,16 @@
SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
StorageState state = scmStorageConfig.getState();
final SCMHANodeDetails haDetails = SCMHANodeDetails.loadSCMHAConfig(conf);
+ String primordialSCM = SCMHAUtils.getPrimordialSCM(conf);
+ String selfNodeId = haDetails.getLocalNodeDetails().getNodeId();
+ if (primordialSCM != null && !SCMHAUtils
+ .isPrimordialSCM(conf, selfNodeId)) {
+ LOG.info(
+ "SCM init command can only be executed in Primordial SCM {}, "
+ + "self id {} "
+ + "Ignoring it.", primordialSCM, selfNodeId);
+ return false;
+ }
if (state != StorageState.INITIALIZED) {
try {
if (clusterId != null && !clusterId.isEmpty()) {
@@ -819,7 +838,8 @@
+ ";cid={};layoutVersion={}", scmStorageConfig.getStorageDir(),
clusterId, scmStorageConfig.getLayoutVersion());
if (SCMHAUtils.isSCMHAEnabled(conf)) {
- SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId);
+ SCMRatisServerImpl.reinitialize(clusterId, scmStorageConfig.getScmId(),
+ haDetails.getLocalNodeDetails(), conf);
}
return true;
}
@@ -1098,8 +1118,6 @@
if (jvmPauseMonitor != null) {
jvmPauseMonitor.stop();
}
- IOUtils.cleanupWithLogger(LOG, containerManager);
- IOUtils.cleanupWithLogger(LOG, pipelineManager);
try {
scmHAManager.shutdown();
@@ -1107,6 +1125,9 @@
LOG.error("SCM HA Manager stop failed", ex);
}
+ IOUtils.cleanupWithLogger(LOG, containerManager);
+ IOUtils.cleanupWithLogger(LOG, pipelineManager);
+
try {
scmMetadataStore.stop();
} catch (Exception ex) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
index 140b010..0c3fbcc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
@@ -25,7 +25,6 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -37,7 +36,6 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
-import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
public class TestSCMSnapshot {
private static MiniOzoneCluster cluster;
@@ -82,15 +80,6 @@
"index 1 {}", snapshotInfo2, snapshotInfo1),
snapshotInfo2 > snapshotInfo1);
- Table<String, TransactionInfo> trxInfo =
- scm.getScmMetadataStore().getTransactionInfoTable();
- TransactionInfo transactionInfo = trxInfo.get(TRANSACTION_INFO_KEY);
-
- Assert.assertTrue(
- "DB trx info:" + transactionInfo.getTransactionIndex()
- + ", latestSnapshotInfo:" + snapshotInfo2,
- transactionInfo.getTransactionIndex() >= snapshotInfo2);
-
cluster.restartStorageContainerManager(false);
TransactionInfo trxInfoAfterRestart =
scm.getScmHAManager().asSCMHADBTransactionBuffer().getLatestTrxInfo();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index a6bdee5..4cf2049 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -27,6 +27,7 @@
.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
import static org.junit.Assert.fail;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.scm.TestUtils;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
@@ -49,7 +50,11 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.Arrays;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -75,9 +80,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
-import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.ha.*;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -100,6 +103,9 @@
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -493,7 +499,7 @@
final UUID clusterId = UUID.randomUUID();
// This will initialize SCM
StorageContainerManager.scmInit(conf, clusterId.toString());
- SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId.toString());
+ validateRatisGroupExists(conf, clusterId.toString());
}
@Test
@@ -518,6 +524,78 @@
}
}
+ @VisibleForTesting
+ public static void validateRatisGroupExists(OzoneConfiguration conf,
+ String clusterId) throws IOException {
+ final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
+ final RaftProperties properties = RatisUtil.newRaftProperties(haConf, conf);
+ final RaftGroupId raftGroupId =
+ SCMRatisServerImpl.buildRaftGroupId(clusterId);
+ final AtomicBoolean found = new AtomicBoolean(false);
+ RaftServerConfigKeys.storageDir(properties).parallelStream().forEach(
+ (dir) -> Optional.ofNullable(dir.listFiles()).map(Arrays::stream)
+ .orElse(Stream.empty()).filter(File::isDirectory).forEach(sub -> {
+ try {
+ LOG.info("{}: found a subdirectory {}", raftGroupId, sub);
+ RaftGroupId groupId = null;
+ try {
+ groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
+ } catch (Exception e) {
+ LOG.info("{}: The directory {} is not a group directory;"
+ + " ignoring it. ", raftGroupId, sub.getAbsolutePath());
+ }
+ if (groupId != null) {
+ if (groupId.equals(raftGroupId)) {
+ LOG.info(
+ "{} : The directory {} found a group directory for "
+ + "cluster {}", raftGroupId, sub.getAbsolutePath(),
+ clusterId);
+ found.set(true);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ raftGroupId + ": Failed to find the group directory "
+ + sub.getAbsolutePath() + ".", e);
+ }
+ }));
+ if (!found.get()) {
+ throw new IOException(
+ "Could not find any ratis group with id " + raftGroupId);
+ }
+ }
+ @Test
+ public void testSCMReinitializationWithHAEnabled() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, false);
+ final String path = GenericTestUtils.getTempPath(
+ UUID.randomUUID().toString());
+ Path scmPath = Paths.get(path, "scm-meta");
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+ //This will set the cluster id in the version file
+ MiniOzoneCluster cluster =
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+ cluster.waitForClusterToBeReady();
+ try {
+ final String clusterId =
+ cluster.getStorageContainerManager().getClusterId();
+ // validate there is no ratis group pre existing
+ try {
+ validateRatisGroupExists(conf, clusterId);
+ Assert.fail();
+ } catch (IOException ioe) {
+ // Exception is expected here
+ }
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+ // This will re-initialize SCM
+ StorageContainerManager.scmInit(conf, clusterId);
+ // Ratis group with cluster id exists now
+ validateRatisGroupExists(conf, clusterId);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testSCMInitializationFailure()
throws IOException, AuthenticationException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
index c242d61..5754ff7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
@@ -18,6 +18,7 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
@@ -231,6 +232,7 @@
"logIndex is less than it's lastAppliedIndex", newTermIndex);
Assert.assertEquals(followerTermIndex,
followerSM.getLastAppliedTermIndex());
+ Assert.assertFalse(followerSM.getLifeCycleState().isPausingOrPaused());
}
@Test
@@ -240,9 +242,17 @@
// Find the inactive SCM
String followerId = getInactiveSCM(cluster).getScmId();
StorageContainerManager follower = cluster.getSCM(followerId);
+ //cluster.startInactiveSCM(followerId);
+ follower.start();
+ follower.exitSafeMode();
// Do some transactions so that the log index increases
writeToIncreaseLogIndex(leaderSCM, 100);
+ File oldDBLocation =
+ follower.getScmMetadataStore().getStore().getDbLocation();
+ SCMStateMachine sm =
+ follower.getScmHAManager().getRatisServer().getSCMStateMachine();
+ TermIndex termIndex = sm.getLastAppliedTermIndex();
DBCheckpoint leaderDbCheckpoint = leaderSCM.getScmMetadataStore().getStore()
.getCheckpoint(false);
Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation();
@@ -251,6 +261,16 @@
new SCMDBDefinition());
Assert.assertNotNull(leaderCheckpointLocation);
+ // Take a backup of the current DB
+ String dbBackupName =
+ "SCM_CHECKPOINT_BACKUP" + termIndex.getIndex() + "_" + System
+ .currentTimeMillis();
+ File dbDir = oldDBLocation.getParentFile();
+ File checkpointBackup = new File(dbDir, dbBackupName);
+
+ // Take a backup of the leader checkpoint
+ Files.copy(leaderCheckpointLocation.toAbsolutePath(),
+ checkpointBackup.toPath());
// Corrupt the leader checkpoint and install that on the follower. The
// operation should fail and should shutdown.
boolean delete = true;
@@ -278,6 +298,18 @@
Assert.assertTrue(logCapture.getOutput()
.contains("Failed to reload SCM state and instantiate services."));
+ Assert.assertTrue(sm.getLifeCycleState().isPausingOrPaused());
+
+ // Verify correct reloading
+ HAUtils
+ .replaceDBWithCheckpoint(leaderCheckpointTrxnInfo.getTransactionIndex(),
+ oldDBLocation, checkpointBackup.toPath(),
+ OzoneConsts.SCM_DB_BACKUP_PREFIX);
+ scmhaManager.startServices();
+ sm.unpause(leaderCheckpointTrxnInfo.getTerm(),
+ leaderCheckpointTrxnInfo.getTransactionIndex());
+ Assert.assertTrue(sm.getLastAppliedTermIndex()
+ .equals(leaderCheckpointTrxnInfo.getTermIndex()));
}
private List<ContainerInfo> writeToIncreaseLogIndex(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
index abf1e3d..1a60067 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -43,6 +44,7 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
+import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -51,7 +53,6 @@
import org.junit.rules.Timeout;
import java.io.IOException;
-import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
@@ -208,4 +209,23 @@
}
return sync;
}
+
+ @Test
+ public void testPrimordialSCM() throws Exception {
+ StorageContainerManager scm1 = cluster.getStorageContainerManagers().get(0);
+ StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1);
+ OzoneConfiguration conf1 = scm1.getConfiguration();
+ OzoneConfiguration conf2 = scm2.getConfiguration();
+ conf1.set(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY,
+ scm1.getSCMNodeId());
+ conf2.set(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY,
+ scm1.getSCMNodeId());
+ Assert.assertFalse(StorageContainerManager.scmBootstrap(conf1));
+ scm1.getScmHAManager().shutdown();
+ Assert.assertTrue(
+ StorageContainerManager.scmInit(conf1, scm1.getClusterId()));
+ Assert.assertTrue(StorageContainerManager.scmBootstrap(conf2));
+ Assert.assertFalse(
+ StorageContainerManager.scmInit(conf2, scm2.getClusterId()));
+ }
}