RATIS-2025. Move out assert and proto methods from RaftServerImpl. (#1041)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 7390093..133cfeb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,101 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.Timekeeper;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
+import org.apache.ratis.protocol.exceptions.StaleReadException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.DivisionProperties;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
+import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
+import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.metrics.LeaderElectionMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.LifeCycle.State;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedSupplier;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.NoSuchFileException;
@@ -41,77 +136,17 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.Timekeeper;
-import org.apache.ratis.proto.RaftProtos.*;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.protocol.ClientInvocationId;
-import org.apache.ratis.protocol.GroupInfoReply;
-import org.apache.ratis.protocol.GroupInfoRequest;
-import org.apache.ratis.protocol.LeaderElectionManagementRequest;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.protocol.SnapshotManagementRequest;
-import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.protocol.exceptions.*;
-import org.apache.ratis.server.DataStreamMap;
-import org.apache.ratis.server.DivisionInfo;
-import org.apache.ratis.server.DivisionProperties;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.LeaderElection.Phase;
-import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
-import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.leader.LogAppender;
-import org.apache.ratis.server.metrics.LeaderElectionMetrics;
-import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
-import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
-import org.apache.ratis.server.protocol.RaftServerProtocol;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.raftlog.RaftLogIOException;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.server.storage.RaftStorageDirectory;
-import org.apache.ratis.server.util.ServerStringUtils;
-import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.util.CodeInjectionForTesting;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.ConcurrentUtils;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.LifeCycle.State;
-import org.apache.ratis.util.MemoizedSupplier;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ProtoUtils;
-import org.apache.ratis.util.ReferenceCountedObject;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.Timestamp;
-import org.apache.ratis.util.function.CheckedSupplier;
-
+import static org.apache.ratis.server.impl.ServerImplUtils.assertEntries;
+import static org.apache.ratis.server.impl.ServerImplUtils.assertGroup;
 import static org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex;
 import static org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto;
 import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
 import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
+import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString;
 
 class RaftServerImpl implements RaftServer.Division,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
@@ -611,46 +646,7 @@
   }
 
   RoleInfoProto getRoleInfoProto() {
-    RaftPeerRole currentRole = role.getCurrentRole();
-    RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
-        .setSelf(getPeer().getRaftPeerProto())
-        .setRole(currentRole)
-        .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
-    switch (currentRole) {
-    case CANDIDATE:
-      CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
-          .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs());
-      roleInfo.setCandidateInfo(candidate);
-      break;
-
-    case LISTENER:
-    case FOLLOWER:
-      final Optional<FollowerState> fs = role.getFollowerState();
-      final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
-        getRaftConf().getPeer(state.getLeaderId()),
-        fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
-      // FollowerState can be null while adding a new peer as it is not
-      // a voting member yet
-      roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
-        .setLeaderInfo(leaderInfo)
-        .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
-      break;
-
-    case LEADER:
-      role.getLeaderState().ifPresent(ls -> {
-        final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
-        ls.getLogAppenders().map(LogAppender::getFollower).forEach(f ->
-            leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto(
-                f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs())));
-        leader.setTerm(ls.getCurrentTerm());
-        roleInfo.setLeaderInfo(leader);
-      });
-      break;
-
-    default:
-      throw new IllegalStateException("incorrect role of server " + currentRole);
-    }
-    return roleInfo.build();
+    return role.buildRoleInfoProto(this);
   }
 
   synchronized void changeToCandidate(boolean forceStartLeaderElection) {
@@ -711,7 +707,7 @@
    */
   private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
     try {
-      assertGroup(request.getRequestorId(), request.getRaftGroupId());
+      assertGroup(getMemberId(), request);
     } catch (GroupMismatchException e) {
       return RetryCacheImpl.failWithException(e, entry);
     }
@@ -760,15 +756,6 @@
         getMemberId() + " is not in " + expected + ": current state is " + c), expected);
   }
 
-  void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
-    final RaftGroupId groupId = getMemberId().getGroupId();
-    if (!groupId.equals(requestorGroupId)) {
-      throw new GroupMismatchException(getMemberId()
-          + ": The group (" + requestorGroupId + ") of " + requestorId
-          + " does not match the group (" + groupId + ") of the server " + getId());
-    }
-  }
-
   /**
    * Append a transaction to the log for processing a client request.
    * Note that the given request could be different from {@link TransactionContext#getClientRequest()}
@@ -1002,8 +989,7 @@
     if (leaderId == null) {
       return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
     }
-    final ReadIndexRequestProto request =
-        ServerProtoUtils.toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
+    final ReadIndexRequestProto request = toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
     try {
       return getServerRpc().async().readIndexAsync(request);
     } catch (IOException e) {
@@ -1180,7 +1166,7 @@
 
     LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    assertGroup(getMemberId(), request);
 
     synchronized (this) {
       CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
@@ -1221,7 +1207,7 @@
   CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
     LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    assertGroup(getMemberId(), request);
 
     //TODO(liuyaolong): get the gap value from shell command
     long minGapValue = RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties());
@@ -1253,7 +1239,7 @@
       throws IOException {
     LOG.info("{} receive leaderElectionManagement request {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    assertGroup(getMemberId(), request);
 
     final LeaderElectionManagementRequest.Pause pause = request.getPause();
     if (pause != null) {
@@ -1272,7 +1258,7 @@
   CompletableFuture<RaftClientReply> stepDownLeaderAsync(TransferLeadershipRequest request) throws IOException {
     LOG.info("{} receive stepDown leader request {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    assertGroup(getMemberId(), request);
 
     return role.getLeaderState().map(leader -> leader.submitStepDownRequestAsync(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
@@ -1289,7 +1275,7 @@
   public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException {
     LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    assertGroup(getMemberId(), request);
 
     CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
     if (reply != null) {
@@ -1368,15 +1354,13 @@
   }
 
   /**
-   * check if the remote peer is not included in the current conf
-   * and should shutdown. should shutdown if all the following stands:
-   * 1. this is a leader
+   * The remote peer should shut down if all the following are true.
+   * 1. this is the current leader
    * 2. current conf is stable and has been committed
-   * 3. candidate id is not included in conf
-   * 4. candidate's last entry's index < conf's index
+   * 3. candidate is not in the current conf
+   * 4. candidate last entry index < conf index (the candidate was removed)
    */
-  private boolean shouldSendShutdown(RaftPeerId candidateId,
-      TermIndex candidateLastEntry) {
+  private boolean shouldSendShutdown(RaftPeerId candidateId, TermIndex candidateLastEntry) {
     return getInfo().isLeader()
         && getRaftConf().isStable()
         && getState().isConfCommitted()
@@ -1403,7 +1387,7 @@
     LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
         getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(candidateId, candidateGroupId);
+    assertGroup(getMemberId(), candidateId, candidateGroupId);
 
     boolean shouldShutdown = false;
     final RequestVoteReplyProto reply;
@@ -1430,49 +1414,16 @@
       } else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
         shouldShutdown = true;
       }
-      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
+      reply = toRequestVoteReplyProto(candidateId, getMemberId(),
           voteGranted, state.getCurrentTerm(), shouldShutdown);
       if (LOG.isInfoEnabled()) {
         LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
-            getMemberId(), phase, ServerStringUtils.toRequestVoteReplyString(reply), state);
+            getMemberId(), phase, toRequestVoteReplyString(reply), state);
       }
     }
     return reply;
   }
 
-  private void validateEntries(long expectedTerm, TermIndex previous,
-      List<LogEntryProto> entries) {
-    if (entries != null && !entries.isEmpty()) {
-      final long index0 = entries.get(0).getIndex();
-      // Check if next entry's index is 1 greater than the snapshotIndex. If yes, then
-      // we do not have to check for the existence of previous.
-      if (index0 != state.getSnapshotIndex() + 1) {
-        if (previous == null || previous.getTerm() == 0) {
-          Preconditions.assertTrue(index0 == 0,
-              "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
-              0, index0);
-        } else {
-          Preconditions.assertTrue(previous.getIndex() == index0 - 1,
-              "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
-              previous, 0, index0);
-        }
-      }
-
-      for (int i = 0; i < entries.size(); i++) {
-        LogEntryProto entry = entries.get(i);
-        final long t = entry.getTerm();
-        Preconditions.assertTrue(expectedTerm >= t,
-            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
-            i, t, expectedTerm);
-
-        final long indexi = entry.getIndex();
-        Preconditions.assertTrue(indexi == index0 + i,
-            "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
-            i, indexi, index0);
-      }
-    }
-  }
-
   @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
       throws IOException {
@@ -1499,8 +1450,8 @@
       if (!startComplete.get()) {
         throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
       }
-      assertGroup(leaderId, leaderGroupId);
-      validateEntries(r.getLeaderTerm(), previous, r.getEntriesList());
+      assertGroup(getMemberId(), leaderId, leaderGroupId);
+      assertEntries(r, previous, state);
 
       return appendEntriesAsync(leaderId, request.getCallId(), previous, requestRef);
     } catch(Exception t) {
@@ -1519,14 +1470,12 @@
 
     final LeaderStateImpl leader = role.getLeaderState().orElse(null);
     if (leader == null) {
-      return CompletableFuture.completedFuture(
-          ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, RaftLog.INVALID_LOG_INDEX));
+      return CompletableFuture.completedFuture(toReadIndexReplyProto(peerId, getMemberId()));
     }
 
     return getReadIndex(ClientProtoUtils.toRaftClientRequest(request.getClientRequest()), leader)
-        .thenApply(index -> ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), true, index))
-        .exceptionally(throwable ->
-            ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, RaftLog.INVALID_LOG_INDEX));
+        .thenApply(index -> toReadIndexReplyProto(peerId, getMemberId(), true, index))
+        .exceptionally(throwable -> toReadIndexReplyProto(peerId, getMemberId()));
   }
 
   static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -1731,37 +1680,37 @@
     if (!request.hasLeaderLastEntry()) {
       // It should have a leaderLastEntry since there is a placeHolder entry.
       LOG.warn("{}: leaderLastEntry is missing in {}", getMemberId(), request);
-      return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+      return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
     }
 
     final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
     LOG.debug("{}: receive startLeaderElection from {} with lastEntry {}", getMemberId(), leaderId, leaderLastEntry);
 
     assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(leaderId, leaderGroupId);
+    assertGroup(getMemberId(), leaderId, leaderGroupId);
 
     synchronized (this) {
       // Check life cycle state again to avoid the PAUSING/PAUSED state.
       assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
       final boolean recognized = state.recognizeLeader("startLeaderElection", leaderId, leaderLastEntry.getTerm());
       if (!recognized) {
-        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+        return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
       }
 
       if (!getInfo().isFollower()) {
         LOG.warn("{} refused StartLeaderElectionRequest from {}, because role is:{}",
             getMemberId(), leaderId, role.getCurrentRole());
-        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+        return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
       }
 
       if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) {
         LOG.warn("{} refused StartLeaderElectionRequest from {}, because lastEntry:{} less than leaderEntry:{}",
             getMemberId(), leaderId, leaderLastEntry, state.getLastEntry());
-        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+        return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
       }
 
       changeToCandidate(true);
-      return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
+      return toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index fe2bc96..5eb01a9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -18,8 +18,14 @@
 
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
+import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
+import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.Timestamp;
@@ -32,6 +38,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto;
+
 /**
  * Maintain the Role of a Raft Peer.
  */
@@ -141,6 +149,55 @@
     return updated;
   }
 
+  RoleInfoProto buildRoleInfoProto(RaftServerImpl server) {
+    final RaftPeerRole currentRole = getCurrentRole();
+    final RoleInfoProto.Builder proto = RoleInfoProto.newBuilder()
+        .setSelf(server.getPeer().getRaftPeerProto())
+        .setRole(currentRole)
+        .setRoleElapsedTimeMs(getRoleElapsedTimeMs());
+
+    switch (currentRole) {
+      case LEADER:
+        getLeaderState().ifPresent(leader -> {
+          final LeaderInfoProto.Builder b = LeaderInfoProto.newBuilder()
+              .setTerm(leader.getCurrentTerm());
+          leader.getLogAppenders()
+              .map(LogAppender::getFollower)
+              .map(f -> toServerRpcProto(f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs()))
+              .forEach(b::addFollowerInfo);
+          proto.setLeaderInfo(b);
+        });
+        return proto.build();
+
+      case CANDIDATE:
+        return proto.setCandidateInfo(CandidateInfoProto.newBuilder()
+            .setLastLeaderElapsedTimeMs(server.getState().getLastLeaderElapsedTimeMs()))
+            .build();
+
+      case LISTENER:
+      case FOLLOWER:
+        // FollowerState can be null while adding a new peer as it is not a voting member yet
+        final FollowerState follower = getFollowerState().orElse(null);
+        final long rpcElapsed;
+        final int outstandingOp;
+        if (follower != null) {
+          rpcElapsed = follower.getLastRpcTime().elapsedTimeMs();
+          outstandingOp = follower.getOutstandingOp();
+        } else {
+          rpcElapsed = 0;
+          outstandingOp = 0;
+        }
+        final RaftPeer leader = server.getRaftConf().getPeer(server.getState().getLeaderId());
+        return proto.setFollowerInfo(FollowerInfoProto.newBuilder()
+            .setLeaderInfo(toServerRpcProto(leader, rpcElapsed))
+            .setOutstandingOp(outstandingOp))
+            .build();
+
+      default:
+        throw new IllegalStateException("Unexpected role " + currentRole);
+    }
+  }
+
   @Override
   public String toString() {
     return String.format("%9s", role);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index e4fe8f2..e26c6e0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,9 +19,15 @@
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -35,7 +41,6 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /** Server utilities for internal use. */
@@ -88,7 +93,51 @@
   }
 
   static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
-    final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
-    return Math.min(leaderCommitIndex, p + numAppendEntries);
+    final long previous = followerPrevious != null? followerPrevious.getIndex() : RaftLog.LEAST_VALID_LOG_INDEX;
+    return Math.min(leaderCommitIndex, previous + numAppendEntries);
+  }
+
+  static void assertGroup(RaftGroupMemberId serverMemberId, RaftClientRequest request) throws GroupMismatchException {
+    assertGroup(serverMemberId, request.getRequestorId(), request.getRaftGroupId());
+  }
+
+  static void assertGroup(RaftGroupMemberId localMemberId, Object remoteId, RaftGroupId remoteGroupId)
+      throws GroupMismatchException {
+    final RaftGroupId localGroupId = localMemberId.getGroupId();
+    if (!localGroupId.equals(remoteGroupId)) {
+      throw new GroupMismatchException(localMemberId
+          + ": The group (" + remoteGroupId + ") of remote " + remoteId
+          + " does not match the group (" + localGroupId + ") of local " + localMemberId.getPeerId());
+    }
+  }
+
+  static void assertEntries(AppendEntriesRequestProto proto, TermIndex previous, ServerState state) {
+    final List<LogEntryProto> entries = proto.getEntriesList();
+    if (entries != null && !entries.isEmpty()) {
+      final long index0 = entries.get(0).getIndex();
+      // Check if next entry's index is 1 greater than the snapshotIndex. If yes, then
+      // we do not have to check for the existence of previous.
+      if (index0 != state.getSnapshotIndex() + 1) {
+        final long expected = previous == null || previous.getTerm() == 0 ? 0 : previous.getIndex() + 1;
+        Preconditions.assertTrue(index0 == expected,
+            "Unexpected Index: previous is %s but entries[%s].getIndex() == %s != %s",
+            previous, 0, index0, expected);
+      }
+
+      final long leaderTerm = proto.getLeaderTerm();
+      for (int i = 0; i < entries.size(); i++) {
+        final LogEntryProto entry = entries.get(i);
+        final long entryTerm = entry.getTerm();
+        Preconditions.assertTrue(entryTerm <= leaderTerm ,
+            "Unexpected Term: entries[%s].getTerm() == %s > leaderTerm == %s",
+            i, entryTerm, leaderTerm);
+
+        final long indexI = entry.getIndex();
+        final long expected = index0 + i;
+        Preconditions.assertTrue(indexI == expected,
+            "Unexpected Index: entries[0].getIndex() == %s but entries[%s].getIndex() == %s != %s",
+            index0, i, indexI, expected);
+      }
+    }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f2be8c6..e35cb23 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -126,6 +126,10 @@
         .build();
   }
 
+  static ReadIndexReplyProto toReadIndexReplyProto(RaftPeerId requestorId, RaftGroupMemberId replyId) {
+    return toReadIndexReplyProto(requestorId, replyId, false, RaftLog.INVALID_LOG_INDEX);
+  }
+
   @SuppressWarnings("parameternumber")
   static AppendEntriesReplyProto toAppendEntriesReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 7aae944..3e5ac2b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -113,7 +113,7 @@
     CodeInjectionForTesting.execute(RaftServerImpl.INSTALL_SNAPSHOT, server.getId(), leaderId, request);
 
     server.assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
-    server.assertGroup(leaderId, leaderGroupId);
+    ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId);
 
     InstallSnapshotReplyProto reply = null;
     // Check if install snapshot from Leader is enabled