RATIS-2025. Move out assert and proto methods from RaftServerImpl. (#1041)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 7390093..133cfeb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,101 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.Timekeeper;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
+import org.apache.ratis.protocol.exceptions.StaleReadException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.DivisionProperties;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
+import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
+import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.metrics.LeaderElectionMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.LifeCycle.State;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedSupplier;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
@@ -41,77 +136,17 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.Timekeeper;
-import org.apache.ratis.proto.RaftProtos.*;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.protocol.ClientInvocationId;
-import org.apache.ratis.protocol.GroupInfoReply;
-import org.apache.ratis.protocol.GroupInfoRequest;
-import org.apache.ratis.protocol.LeaderElectionManagementRequest;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.protocol.SnapshotManagementRequest;
-import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.protocol.exceptions.*;
-import org.apache.ratis.server.DataStreamMap;
-import org.apache.ratis.server.DivisionInfo;
-import org.apache.ratis.server.DivisionProperties;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.LeaderElection.Phase;
-import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
-import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.leader.LogAppender;
-import org.apache.ratis.server.metrics.LeaderElectionMetrics;
-import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
-import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
-import org.apache.ratis.server.protocol.RaftServerProtocol;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.raftlog.RaftLogIOException;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
-import org.apache.ratis.server.util.ServerStringUtils;
-import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.util.CodeInjectionForTesting;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.ConcurrentUtils;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.LifeCycle.State;
-import org.apache.ratis.util.MemoizedSupplier;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ProtoUtils;
-import org.apache.ratis.util.ReferenceCountedObject;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.Timestamp;
-import org.apache.ratis.util.function.CheckedSupplier;
-
+import static org.apache.ratis.server.impl.ServerImplUtils.assertEntries;
+import static org.apache.ratis.server.impl.ServerImplUtils.assertGroup;
import static org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex;
import static org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto;
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
+import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString;
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
@@ -611,46 +646,7 @@
}
RoleInfoProto getRoleInfoProto() {
- RaftPeerRole currentRole = role.getCurrentRole();
- RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
- .setSelf(getPeer().getRaftPeerProto())
- .setRole(currentRole)
- .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
- switch (currentRole) {
- case CANDIDATE:
- CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
- .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs());
- roleInfo.setCandidateInfo(candidate);
- break;
-
- case LISTENER:
- case FOLLOWER:
- final Optional<FollowerState> fs = role.getFollowerState();
- final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
- getRaftConf().getPeer(state.getLeaderId()),
- fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
- // FollowerState can be null while adding a new peer as it is not
- // a voting member yet
- roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
- .setLeaderInfo(leaderInfo)
- .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
- break;
-
- case LEADER:
- role.getLeaderState().ifPresent(ls -> {
- final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
- ls.getLogAppenders().map(LogAppender::getFollower).forEach(f ->
- leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto(
- f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs())));
- leader.setTerm(ls.getCurrentTerm());
- roleInfo.setLeaderInfo(leader);
- });
- break;
-
- default:
- throw new IllegalStateException("incorrect role of server " + currentRole);
- }
- return roleInfo.build();
+ return role.buildRoleInfoProto(this);
}
synchronized void changeToCandidate(boolean forceStartLeaderElection) {
@@ -711,7 +707,7 @@
*/
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
try {
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
} catch (GroupMismatchException e) {
return RetryCacheImpl.failWithException(e, entry);
}
@@ -760,15 +756,6 @@
getMemberId() + " is not in " + expected + ": current state is " + c), expected);
}
- void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
- final RaftGroupId groupId = getMemberId().getGroupId();
- if (!groupId.equals(requestorGroupId)) {
- throw new GroupMismatchException(getMemberId()
- + ": The group (" + requestorGroupId + ") of " + requestorId
- + " does not match the group (" + groupId + ") of the server " + getId());
- }
- }
-
/**
* Append a transaction to the log for processing a client request.
* Note that the given request could be different from {@link TransactionContext#getClientRequest()}
@@ -1002,8 +989,7 @@
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
}
- final ReadIndexRequestProto request =
- ServerProtoUtils.toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
+ final ReadIndexRequestProto request = toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
try {
return getServerRpc().async().readIndexAsync(request);
} catch (IOException e) {
@@ -1180,7 +1166,7 @@
LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
synchronized (this) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
@@ -1221,7 +1207,7 @@
CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
//TODO(liuyaolong): get the gap value from shell command
long minGapValue = RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties());
@@ -1253,7 +1239,7 @@
throws IOException {
LOG.info("{} receive leaderElectionManagement request {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
final LeaderElectionManagementRequest.Pause pause = request.getPause();
if (pause != null) {
@@ -1272,7 +1258,7 @@
CompletableFuture<RaftClientReply> stepDownLeaderAsync(TransferLeadershipRequest request) throws IOException {
LOG.info("{} receive stepDown leader request {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
return role.getLeaderState().map(leader -> leader.submitStepDownRequestAsync(request))
.orElseGet(() -> CompletableFuture.completedFuture(
@@ -1289,7 +1275,7 @@
public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException {
LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
@@ -1368,15 +1354,13 @@
}
/**
- * check if the remote peer is not included in the current conf
- * and should shutdown. should shutdown if all the following stands:
- * 1. this is a leader
+ * The remote peer should shut down if all the following are true.
+ * 1. this is the current leader
* 2. current conf is stable and has been committed
- * 3. candidate id is not included in conf
- * 4. candidate's last entry's index < conf's index
+ * 3. candidate is not in the current conf
+ * 4. candidate last entry index < conf index (the candidate was removed)
*/
- private boolean shouldSendShutdown(RaftPeerId candidateId,
- TermIndex candidateLastEntry) {
+ private boolean shouldSendShutdown(RaftPeerId candidateId, TermIndex candidateLastEntry) {
return getInfo().isLeader()
&& getRaftConf().isStable()
&& getState().isConfCommitted()
@@ -1403,7 +1387,7 @@
LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(candidateId, candidateGroupId);
+ assertGroup(getMemberId(), candidateId, candidateGroupId);
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
@@ -1430,49 +1414,16 @@
} else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
shouldShutdown = true;
}
- reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
+ reply = toRequestVoteReplyProto(candidateId, getMemberId(),
voteGranted, state.getCurrentTerm(), shouldShutdown);
if (LOG.isInfoEnabled()) {
LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
- getMemberId(), phase, ServerStringUtils.toRequestVoteReplyString(reply), state);
+ getMemberId(), phase, toRequestVoteReplyString(reply), state);
}
}
return reply;
}
- private void validateEntries(long expectedTerm, TermIndex previous,
- List<LogEntryProto> entries) {
- if (entries != null && !entries.isEmpty()) {
- final long index0 = entries.get(0).getIndex();
- // Check if next entry's index is 1 greater than the snapshotIndex. If yes, then
- // we do not have to check for the existence of previous.
- if (index0 != state.getSnapshotIndex() + 1) {
- if (previous == null || previous.getTerm() == 0) {
- Preconditions.assertTrue(index0 == 0,
- "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
- 0, index0);
- } else {
- Preconditions.assertTrue(previous.getIndex() == index0 - 1,
- "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
- previous, 0, index0);
- }
- }
-
- for (int i = 0; i < entries.size(); i++) {
- LogEntryProto entry = entries.get(i);
- final long t = entry.getTerm();
- Preconditions.assertTrue(expectedTerm >= t,
- "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
- i, t, expectedTerm);
-
- final long indexi = entry.getIndex();
- Preconditions.assertTrue(indexi == index0 + i,
- "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
- i, indexi, index0);
- }
- }
- }
-
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
throws IOException {
@@ -1499,8 +1450,8 @@
if (!startComplete.get()) {
throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
}
- assertGroup(leaderId, leaderGroupId);
- validateEntries(r.getLeaderTerm(), previous, r.getEntriesList());
+ assertGroup(getMemberId(), leaderId, leaderGroupId);
+ assertEntries(r, previous, state);
return appendEntriesAsync(leaderId, request.getCallId(), previous, requestRef);
} catch(Exception t) {
@@ -1519,14 +1470,12 @@
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader == null) {
- return CompletableFuture.completedFuture(
- ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, RaftLog.INVALID_LOG_INDEX));
+ return CompletableFuture.completedFuture(toReadIndexReplyProto(peerId, getMemberId()));
}
return getReadIndex(ClientProtoUtils.toRaftClientRequest(request.getClientRequest()), leader)
- .thenApply(index -> ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), true, index))
- .exceptionally(throwable ->
- ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, RaftLog.INVALID_LOG_INDEX));
+ .thenApply(index -> toReadIndexReplyProto(peerId, getMemberId(), true, index))
+ .exceptionally(throwable -> toReadIndexReplyProto(peerId, getMemberId()));
}
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -1731,37 +1680,37 @@
if (!request.hasLeaderLastEntry()) {
// It should have a leaderLastEntry since there is a placeHolder entry.
LOG.warn("{}: leaderLastEntry is missing in {}", getMemberId(), request);
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
LOG.debug("{}: receive startLeaderElection from {} with lastEntry {}", getMemberId(), leaderId, leaderLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(leaderId, leaderGroupId);
+ assertGroup(getMemberId(), leaderId, leaderGroupId);
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader("startLeaderElection", leaderId, leaderLastEntry.getTerm());
if (!recognized) {
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
if (!getInfo().isFollower()) {
LOG.warn("{} refused StartLeaderElectionRequest from {}, because role is:{}",
getMemberId(), leaderId, role.getCurrentRole());
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) {
LOG.warn("{} refused StartLeaderElectionRequest from {}, because lastEntry:{} less than leaderEntry:{}",
getMemberId(), leaderId, leaderLastEntry, state.getLastEntry());
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
changeToCandidate(true);
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index fe2bc96..5eb01a9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -18,8 +18,14 @@
package org.apache.ratis.server.impl;
+import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
+import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
+import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.Timestamp;
@@ -32,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto;
+
/**
* Maintain the Role of a Raft Peer.
*/
@@ -141,6 +149,55 @@
return updated;
}
+ RoleInfoProto buildRoleInfoProto(RaftServerImpl server) {
+ final RaftPeerRole currentRole = getCurrentRole();
+ final RoleInfoProto.Builder proto = RoleInfoProto.newBuilder()
+ .setSelf(server.getPeer().getRaftPeerProto())
+ .setRole(currentRole)
+ .setRoleElapsedTimeMs(getRoleElapsedTimeMs());
+
+ switch (currentRole) {
+ case LEADER:
+ getLeaderState().ifPresent(leader -> {
+ final LeaderInfoProto.Builder b = LeaderInfoProto.newBuilder()
+ .setTerm(leader.getCurrentTerm());
+ leader.getLogAppenders()
+ .map(LogAppender::getFollower)
+ .map(f -> toServerRpcProto(f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs()))
+ .forEach(b::addFollowerInfo);
+ proto.setLeaderInfo(b);
+ });
+ return proto.build();
+
+ case CANDIDATE:
+ return proto.setCandidateInfo(CandidateInfoProto.newBuilder()
+ .setLastLeaderElapsedTimeMs(server.getState().getLastLeaderElapsedTimeMs()))
+ .build();
+
+ case LISTENER:
+ case FOLLOWER:
+ // FollowerState can be null while adding a new peer as it is not a voting member yet
+ final FollowerState follower = getFollowerState().orElse(null);
+ final long rpcElapsed;
+ final int outstandingOp;
+ if (follower != null) {
+ rpcElapsed = follower.getLastRpcTime().elapsedTimeMs();
+ outstandingOp = follower.getOutstandingOp();
+ } else {
+ rpcElapsed = 0;
+ outstandingOp = 0;
+ }
+ final RaftPeer leader = server.getRaftConf().getPeer(server.getState().getLeaderId());
+ return proto.setFollowerInfo(FollowerInfoProto.newBuilder()
+ .setLeaderInfo(toServerRpcProto(leader, rpcElapsed))
+ .setOutstandingOp(outstandingOp))
+ .build();
+
+ default:
+ throw new IllegalStateException("Unexpected role " + currentRole);
+ }
+ }
+
@Override
public String toString() {
return String.format("%9s", role);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index e4fe8f2..e26c6e0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,9 +19,15 @@
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
@@ -35,7 +41,6 @@
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** Server utilities for internal use. */
@@ -88,7 +93,51 @@
}
static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
- final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
- return Math.min(leaderCommitIndex, p + numAppendEntries);
+ final long previous = followerPrevious != null? followerPrevious.getIndex() : RaftLog.LEAST_VALID_LOG_INDEX;
+ return Math.min(leaderCommitIndex, previous + numAppendEntries);
+ }
+
+ static void assertGroup(RaftGroupMemberId serverMemberId, RaftClientRequest request) throws GroupMismatchException {
+ assertGroup(serverMemberId, request.getRequestorId(), request.getRaftGroupId());
+ }
+
+ static void assertGroup(RaftGroupMemberId localMemberId, Object remoteId, RaftGroupId remoteGroupId)
+ throws GroupMismatchException {
+ final RaftGroupId localGroupId = localMemberId.getGroupId();
+ if (!localGroupId.equals(remoteGroupId)) {
+ throw new GroupMismatchException(localMemberId
+ + ": The group (" + remoteGroupId + ") of remote " + remoteId
+ + " does not match the group (" + localGroupId + ") of local " + localMemberId.getPeerId());
+ }
+ }
+
+ static void assertEntries(AppendEntriesRequestProto proto, TermIndex previous, ServerState state) {
+ final List<LogEntryProto> entries = proto.getEntriesList();
+ if (entries != null && !entries.isEmpty()) {
+ final long index0 = entries.get(0).getIndex();
+ // Check if next entry's index is 1 greater than the snapshotIndex. If yes, then
+ // we do not have to check for the existence of previous.
+ if (index0 != state.getSnapshotIndex() + 1) {
+ final long expected = previous == null || previous.getTerm() == 0 ? 0 : previous.getIndex() + 1;
+ Preconditions.assertTrue(index0 == expected,
+ "Unexpected Index: previous is %s but entries[%s].getIndex() == %s != %s",
+ previous, 0, index0, expected);
+ }
+
+ final long leaderTerm = proto.getLeaderTerm();
+ for (int i = 0; i < entries.size(); i++) {
+ final LogEntryProto entry = entries.get(i);
+ final long entryTerm = entry.getTerm();
+ Preconditions.assertTrue(entryTerm <= leaderTerm ,
+ "Unexpected Term: entries[%s].getTerm() == %s > leaderTerm == %s",
+ i, entryTerm, leaderTerm);
+
+ final long indexI = entry.getIndex();
+ final long expected = index0 + i;
+ Preconditions.assertTrue(indexI == expected,
+ "Unexpected Index: entries[0].getIndex() == %s but entries[%s].getIndex() == %s != %s",
+ index0, i, indexI, expected);
+ }
+ }
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f2be8c6..e35cb23 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -126,6 +126,10 @@
.build();
}
+ static ReadIndexReplyProto toReadIndexReplyProto(RaftPeerId requestorId, RaftGroupMemberId replyId) {
+ return toReadIndexReplyProto(requestorId, replyId, false, RaftLog.INVALID_LOG_INDEX);
+ }
+
@SuppressWarnings("parameternumber")
static AppendEntriesReplyProto toAppendEntriesReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 7aae944..3e5ac2b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -113,7 +113,7 @@
CodeInjectionForTesting.execute(RaftServerImpl.INSTALL_SNAPSHOT, server.getId(), leaderId, request);
server.assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
- server.assertGroup(leaderId, leaderGroupId);
+ ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId);
InstallSnapshotReplyProto reply = null;
// Check if install snapshot from Leader is enabled