HDDS-4922. refactor code in SCMStateMachine. (#2007)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java
index c75f4e5..ec4c0e1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java
@@ -59,7 +59,7 @@
this.transactionIndex = transactionIndex;
}
- public boolean isInitialized() {
+ public boolean isDefault() {
return transactionIndex == -1 && term == 0;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 593e9c1..39d9071 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -161,7 +161,7 @@
RatisUtil.newRaftProperties(haConf, conf);
return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
.setProperties(serverProperties)
- .setStateMachine(new SCMStateMachine(false));
+ .setStateMachine(new SCMStateMachine());
}
@Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 15332d7..a1047b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -27,7 +27,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,47 +60,46 @@
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.SCM_NOT_INITIALIZED;
/**
- * TODO.
+ * The SCMStateMachine is the state machine for SCMRatisServer. It is
+ * responsible for applying ratis committed transactions to
+ * {@link StorageContainerManager}.
*/
public class SCMStateMachine extends BaseStateMachine {
private static final Logger LOG =
LoggerFactory.getLogger(SCMStateMachine.class);
private StorageContainerManager scm;
- private SCMRatisServer ratisServer;
private Map<RequestType, Object> handlers;
private SCMHADBTransactionBuffer transactionBuffer;
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
- private final AtomicBoolean isInitialized;
+ private final boolean isInitialized;
private ExecutorService installSnapshotExecutor;
public SCMStateMachine(final StorageContainerManager scm,
final SCMRatisServer ratisServer, SCMHADBTransactionBuffer buffer)
throws SCMException {
this.scm = scm;
- this.ratisServer = ratisServer;
this.handlers = new EnumMap<>(RequestType.class);
this.transactionBuffer = buffer;
- TransactionInfo latestTrxInfo =
- this.transactionBuffer.getLatestTrxInfo();
- if (!latestTrxInfo.isInitialized()) {
- if (!updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
- latestTrxInfo.getTransactionIndex())) {
- throw new SCMException(
- String.format("Failed to update LastAppliedTermIndex " +
- "in StateMachine to term:{} index:{}",
- latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
- ), SCM_NOT_INITIALIZED);
- }
+ TransactionInfo latestTrxInfo = this.transactionBuffer.getLatestTrxInfo();
+ if (!latestTrxInfo.isDefault() &&
+ !updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
+ latestTrxInfo.getTransactionIndex())) {
+ throw new SCMException(
+ String.format("Failed to update LastAppliedTermIndex " +
+ "in StateMachine to term:{} index:{}",
+ latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
+ ), SCM_NOT_INITIALIZED);
}
this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
- isInitialized = new AtomicBoolean(true);
+ isInitialized = true;
}
- public SCMStateMachine(boolean init) {
- isInitialized = new AtomicBoolean(init);
+ public SCMStateMachine() {
+ isInitialized = false;
}
+
public void registerHandler(RequestType type, Object handler) {
handlers.put(type, handler);
}
@@ -109,9 +107,8 @@
@Override
public SnapshotInfo getLatestSnapshot() {
// Transaction buffer will be null during scm initlialization phase
- return transactionBuffer == null ?
- null :
- transactionBuffer.getLatestSnapshot();
+ return transactionBuffer == null
+ ? null : transactionBuffer.getLatestSnapshot();
}
/**
@@ -173,11 +170,11 @@
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
- LOG.info("current leader SCM steps down.");
-
- if (!isInitialized.get()) {
+ if (!isInitialized) {
return;
}
+ LOG.info("current leader SCM steps down.");
+
scm.getScmContext().updateLeaderAndTerm(false, 0);
scm.getSCMServiceManager().notifyStatusChanged();
}
@@ -208,12 +205,11 @@
@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
RaftPeerId newLeaderId) {
- if (!groupMemberId.getPeerId().equals(newLeaderId)) {
- LOG.info("leader changed, yet current SCM is still follower.");
+ if (!isInitialized) {
return;
}
-
- if (!isInitialized.get()) {
+ if (!groupMemberId.getPeerId().equals(newLeaderId)) {
+ LOG.info("leader changed, yet current SCM is still follower.");
return;
}
@@ -233,34 +229,35 @@
.getDeletedBlockLog();
Preconditions.checkArgument(
deletedBlockLog instanceof DeletedBlockLogImplV2);
- ((DeletedBlockLogImplV2) deletedBlockLog)
- .onBecomeLeader();
+ ((DeletedBlockLogImplV2) deletedBlockLog).onBecomeLeader();
}
@Override
public long takeSnapshot() throws IOException {
- long startTime = Time.monotonicNow();
TermIndex lastTermIndex = getLastAppliedTermIndex();
long lastAppliedIndex = lastTermIndex.getIndex();
- if (isInitialized.get()) {
- TransactionInfo lastAppliedTrxInfo =
- TransactionInfo.fromTermIndex(lastTermIndex);
- if (transactionBuffer.getLatestTrxInfo().compareTo(lastAppliedTrxInfo)
- < 0) {
- transactionBuffer.updateLatestTrxInfo(
- TransactionInfo.builder().setCurrentTerm(lastTermIndex.getTerm())
- .setTransactionIndex(lastTermIndex.getIndex()).build());
- transactionBuffer.setLatestSnapshot(
- transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
- } else {
- lastAppliedIndex =
- transactionBuffer.getLatestTrxInfo().getTransactionIndex();
- }
- transactionBuffer.flush();
- LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
- lastAppliedIndex, Time.monotonicNow() - startTime);
+ if (!isInitialized) {
+ return lastAppliedIndex;
}
+
+ long startTime = Time.monotonicNow();
+
+ TransactionInfo latestTrxInfo = transactionBuffer.getLatestTrxInfo();
+ TransactionInfo lastAppliedTrxInfo =
+ TransactionInfo.fromTermIndex(lastTermIndex);
+
+ if (latestTrxInfo.compareTo(lastAppliedTrxInfo) < 0) {
+ transactionBuffer.updateLatestTrxInfo(lastAppliedTrxInfo);
+ transactionBuffer.setLatestSnapshot(lastAppliedTrxInfo.toSnapshotInfo());
+ } else {
+ lastAppliedIndex = latestTrxInfo.getTransactionIndex();
+ }
+
+ transactionBuffer.flush();
+
+ LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+ lastAppliedIndex, Time.monotonicNow() - startTime);
return lastAppliedIndex;
}
@@ -279,7 +276,6 @@
updateLastAppliedTermIndex(term, index);
}
-
@Override
public void notifyConfigurationChanged(long term, long index,
RaftProtos.RaftConfigurationProto newRaftConfiguration) {
@@ -293,6 +289,9 @@
@Override
public void close() throws IOException {
+ if (!isInitialized) {
+ return;
+ }
super.close();
transactionBuffer.close();
HadoopExecutors.