RATIS-2024. Refactor appendEntries code. (#1040)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c47db14..7390093 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -108,6 +108,11 @@
 import org.apache.ratis.util.Timestamp;
 import org.apache.ratis.util.function.CheckedSupplier;
 
+import static org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto;
+import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
+import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
+
 class RaftServerImpl implements RaftServer.Division,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol {
@@ -1483,18 +1488,24 @@
       ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
     final AppendEntriesRequestProto r = requestRef.retain();
     final RaftRpcRequestProto request = r.getServerRequest();
-    final List<LogEntryProto> entries = r.getEntriesList();
     final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null;
-    final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
-
     try {
-      preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
-          previous, r.getLeaderCommit(), r.getInitializing(), entries);
-      return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
-          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef);
+      final RaftPeerId leaderId = RaftPeerId.valueOf(request.getRequestorId());
+      final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(request.getRaftGroupId());
+
+      CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, previous, r);
+
+      assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
+      if (!startComplete.get()) {
+        throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
+      }
+      assertGroup(leaderId, leaderGroupId);
+      validateEntries(r.getLeaderTerm(), previous, r.getEntriesList());
+
+      return appendEntriesAsync(leaderId, request.getCallId(), previous, requestRef);
     } catch(Exception t) {
-      LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
-      throw t;
+      LOG.error("{}: Failed appendEntries* {}", getMemberId(), toAppendEntriesRequestString(r), t);
+      throw IOUtils.asIOException(t);
     } finally {
       requestRef.release();
     }
@@ -1540,24 +1551,6 @@
     }
   }
 
-  private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
-      TermIndex previous, long leaderCommit, boolean initializing, List<LogEntryProto> entries) throws IOException {
-    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
-        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
-
-    assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
-    if (!startComplete.get()) {
-      throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
-    }
-    assertGroup(leaderId, leaderGroupId);
-
-    try {
-      validateEntries(leaderTerm, previous, entries);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    }
-  }
-
   private long updateCommitInfoCache() {
     return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
   }
@@ -1566,19 +1559,15 @@
     return serverExecutor;
   }
 
-  @SuppressWarnings("checkstyle:parameternumber")
-  private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
-      RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
-      List<CommitInfoProto> commitInfos, List<LogEntryProto> entries,
-      ReferenceCountedObject<?> requestRef) throws IOException {
+  private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
+      TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
+    final AppendEntriesRequestProto proto = requestRef.get();
+    final List<LogEntryProto> entries = proto.getEntriesList();
     final boolean isHeartbeat = entries.isEmpty();
-    logAppendEntries(isHeartbeat,
-        () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
-            + previous + ", " + leaderCommit + ", " + initializing
-            + ", commits:" + ProtoUtils.toString(commitInfos)
-            + ", cId:" + callId
-            + ", entries: " + LogProtoUtils.toLogEntriesString(entries));
+    logAppendEntries(isHeartbeat, () -> getMemberId() + ": appendEntries* "
+        + toAppendEntriesRequestString(proto));
 
+    final long leaderTerm = proto.getLeaderTerm();
     final long currentTerm;
     final long followerCommit = state.getLog().getLastCommittedIndex();
     final Optional<FollowerState> followerState;
@@ -1586,17 +1575,12 @@
     synchronized (this) {
       // Check life cycle state again to avoid the PAUSING/PAUSED state.
       assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
+      final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm);
       if (!recognized) {
-        final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
+        return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
             leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
-            AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
-              getMemberId(), leaderId, leaderTerm, state, ServerStringUtils.toAppendEntriesReplyString(reply));
-        }
-        return CompletableFuture.completedFuture(reply);
+            AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
       }
       try {
         changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
@@ -1605,7 +1589,7 @@
       }
       state.setLeader(leaderId, "appendEntries");
 
-      if (!initializing && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
+      if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
         role.startFollowerState(this, Op.APPEND_ENTRIES);
       }
       followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
@@ -1617,12 +1601,14 @@
       //      3. There is a gap between the local log and the entries
       // In any of these scenarios, we should return an INCONSISTENCY reply
       // back to leader so that the leader can update this follower's next index.
-
-      AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries(
-          leaderId, currentTerm, followerCommit, previous, callId, isHeartbeat, entries);
-      if (inconsistencyReply != null) {
+      final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries);
+      if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
+        final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
+            leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex,
+            AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
+        LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
         followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
-        return CompletableFuture.completedFuture(inconsistencyReply);
+        return CompletableFuture.completedFuture(reply);
       }
 
       state.updateConfiguration(entries);
@@ -1631,7 +1617,7 @@
 
     final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
         : state.getLog().append(requestRef.delegate(entries));
-    commitInfos.forEach(commitInfoCache::update);
+    proto.getCommitInfosList().forEach(commitInfoCache::update);
 
     CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
     if (!isHeartbeat) {
@@ -1641,49 +1627,27 @@
         stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
       }
     }
-    return JavaUtils.allOf(futures).whenCompleteAsync(
-        (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
-        serverExecutor
-    ).thenApply(v -> {
-      final AppendEntriesReplyProto reply;
-      synchronized(this) {
-        final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.size());
-        state.updateCommitIndex(commitIndex, currentTerm, false);
+
+    final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
+    final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
+    return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
+      followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
+      timer.stop();
+    }, getServerExecutor()).thenApply(v -> {
+      final boolean updated = state.updateCommitIndex(commitIndex, currentTerm, false);
+      if (updated) {
         updateCommitInfoCache();
-        final long n;
-        final long matchIndex;
-        if (!isHeartbeat) {
-          LogEntryProto requestLastEntry = entries.get(entries.size() - 1);
-          n = requestLastEntry.getIndex() + 1;
-          matchIndex = requestLastEntry.getIndex();
-        } else {
-          n = state.getLog().getNextIndex();
-          matchIndex = RaftLog.INVALID_LOG_INDEX;
-        }
-        reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm,
-            state.getLog().getLastCommittedIndex(), n, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
       }
-      logAppendEntries(isHeartbeat, () -> getMemberId() + ": succeeded to handle AppendEntries. Reply: "
-          + ServerStringUtils.toAppendEntriesReplyString(reply));
-      timer.stop();  // TODO: future never completes exceptionally?
+      final long nextIndex = isHeartbeat? state.getNextIndex(): matchIndex + 1;
+      final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(leaderId, getMemberId(),
+          currentTerm, updated? commitIndex : state.getLog().getLastCommittedIndex(),
+          nextIndex, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
+      logAppendEntries(isHeartbeat, () -> getMemberId()
+          + ": appendEntries* reply " + toAppendEntriesReplyString(reply));
       return reply;
     });
   }
 
-  private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm,
-      long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, List<LogEntryProto> entries) {
-    final long replyNextIndex = checkInconsistentAppendEntries(previous, entries);
-    if (replyNextIndex == -1) {
-      return null;
-    }
-
-    final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
-        AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
-    LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), ServerStringUtils.toAppendEntriesReplyString(reply));
-    return reply;
-  }
-
   private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
     // Check if a snapshot installation through state machine is in progress.
     final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
@@ -1714,7 +1678,7 @@
       return replyNextIndex;
     }
 
-    return -1;
+    return RaftLog.INVALID_LOG_INDEX;
   }
 
   @Override
@@ -1762,30 +1726,25 @@
     final RaftRpcRequestProto r = request.getServerRequest();
     final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
     final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
-    final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
-
     CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);
 
-    LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},",
-        getMemberId(), leaderId, request.getLeaderLastEntry());
+    if (!request.hasLeaderLastEntry()) {
+      // It should have a leaderLastEntry since there is a placeHolder entry.
+      LOG.warn("{}: leaderLastEntry is missing in {}", getMemberId(), request);
+      return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+    }
+
+    final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
+    LOG.debug("{}: receive startLeaderElection from {} with lastEntry {}", getMemberId(), leaderId, leaderLastEntry);
 
     assertLifeCycleState(LifeCycle.States.RUNNING);
     assertGroup(leaderId, leaderGroupId);
 
     synchronized (this) {
-      // leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry
-      // so leader at each term should has at least one entry
-      if (leaderLastEntry == null) {
-        LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId());
-        return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
-      }
-
       // Check life cycle state again to avoid the PAUSING/PAUSED state.
       assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
-      final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm());
+      final boolean recognized = state.recognizeLeader("startLeaderElection", leaderId, leaderLastEntry.getTerm());
       if (!recognized) {
-        LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}",
-            getMemberId(), leaderId, leaderLastEntry.getTerm(), state);
         return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
       }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e21f63c..d02994e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -321,24 +321,21 @@
     Objects.requireNonNull(operation.getLogEntry());
   }
 
-  /**
-   * Check if accept the leader selfId and term from the incoming AppendEntries rpc.
-   * If accept, update the current state.
-   * @return true if the check passes
-   */
-  boolean recognizeLeader(RaftPeerId peerLeaderId, long leaderTerm) {
+  /** @return true iff the given peer id is recognized as the leader. */
+  boolean recognizeLeader(Object op, RaftPeerId peerId, long peerTerm) {
     final long current = currentTerm.get();
-    if (leaderTerm < current) {
+    if (peerTerm < current) {
+      LOG.warn("{}: Failed to recognize {} as leader for {} since peerTerm = {} < currentTerm = {}",
+          getMemberId(), peerId, op, peerTerm, current);
       return false;
     }
     final RaftPeerId curLeaderId = getLeaderId();
-    if (leaderTerm > current || curLeaderId == null) {
-      // If the request indicates a term that is greater than the current term
-      // or no leader has been set for the current term, make sure to update
-      // leader and term later
-      return true;
+    if (peerTerm == current && curLeaderId != null && !curLeaderId.equals(peerId)) {
+      LOG.warn("{}: Failed to recognize {} as leader for {} since current leader is {} (peerTerm = currentTerm = {})",
+          getMemberId(), peerId, op, curLeaderId, current);
+      return false;
     }
-    return curLeaderId.equals(peerLeaderId);
+    return true;
   }
 
   static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 9794314..7aae944 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -32,6 +32,7 @@
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.util.ServerStringUtils;
@@ -49,6 +50,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.ratis.server.impl.ServerProtoUtils.toInstallSnapshotReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto;
 import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
 
 class SnapshotInstallationHandler {
@@ -142,7 +145,7 @@
     }
 
     // There is a mismatch between configurations on leader and follower.
-    final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto(
+    final InstallSnapshotReplyProto failedReply = toInstallSnapshotReplyProto(
         leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH);
     LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
         getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
@@ -158,13 +161,11 @@
     final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
     final long lastIncludedIndex = lastIncluded.getIndex();
     synchronized (server) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
-        final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
-        LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId());
-        return reply;
       }
       server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
@@ -193,7 +194,7 @@
     if (snapshotChunkRequest.getDone()) {
       LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
     }
-    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+    return toInstallSnapshotReplyProto(leaderId, getMemberId(),
         currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
   }
 
@@ -205,13 +206,11 @@
         request.getNotification().getFirstAvailableTermIndex());
     final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
     synchronized (server) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
-        final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.NOT_LEADER);
-        LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
-        return reply;
       }
       server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
@@ -229,7 +228,7 @@
           inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
           LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
               InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
-          return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
+          return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
               InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
         }
 
@@ -307,7 +306,7 @@
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         server.getStateMachine().event().notifySnapshotInstalled(
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
-        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
       }
 
@@ -325,7 +324,7 @@
         server.getStateMachine().event().notifySnapshotInstalled(
             InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
         installedIndex.set(latestInstalledIndex);
-        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
       }
 
@@ -334,7 +333,7 @@
         LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
             InstallSnapshotResult.IN_PROGRESS);
       }
-      return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+      return toInstallSnapshotReplyProto(leaderId, getMemberId(),
           currentTerm, InstallSnapshotResult.IN_PROGRESS);
     }
   }
@@ -342,7 +341,7 @@
   private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
     final RoleInfo role = server.getRole();
     final Optional<FollowerState> fs = role.getFollowerState();
-    final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(leader,
+    final ServerRpcProto leaderInfo = toServerRpcProto(leader,
         fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
     final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder()
         .setLeaderInfo(leaderInfo)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
index 25223c0..284664d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
@@ -21,16 +21,19 @@
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.util.ProtoUtils;
 
+import java.util.List;
+
 /**
  *  This class provides convenient utilities for converting Protocol Buffers messages to strings.
  *  The output strings are for information purpose only.
  *  They are concise and compact compared to the Protocol Buffers implementations of {@link Object#toString()}.
- *
+ * <p>
  *  The output messages or the output formats may be changed without notice.
  *  Callers of this class should not try to parse the output strings for any purposes.
  *  Instead, they should use the public APIs provided by Protocol Buffers.
@@ -42,12 +45,13 @@
     if (request == null) {
       return null;
     }
+    final List<LogEntryProto> entries = request.getEntriesList();
     return ProtoUtils.toString(request.getServerRequest())
         + "-t" + request.getLeaderTerm()
         + ",previous=" + TermIndex.valueOf(request.getPreviousLog())
         + ",leaderCommit=" + request.getLeaderCommit()
         + ",initializing? " + request.getInitializing()
-        + ",entries: " + LogProtoUtils.toLogEntriesShortString(request.getEntriesList());
+        + "," + (entries.isEmpty()? "HEARTBEAT" : "entries: " + LogProtoUtils.toLogEntriesShortString(entries));
   }
 
   public static String toAppendEntriesReplyString(AppendEntriesReplyProto reply) {