RATIS-1902. The snapshot index is set incorrectly in InstallSnapshotReplyProto. (#933)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 422adf4..5c07d7f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -153,31 +153,31 @@
     return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
   }
 
+  private boolean installSnapshot() {
+    if (installSnapshotEnabled) {
+      final SnapshotInfo snapshot = shouldInstallSnapshot();
+      if (snapshot != null) {
+        installSnapshot(snapshot);
+        return true;
+      }
+    } else {
+      // check installSnapshotNotification
+      final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
+      if (firstAvailable != null) {
+        notifyInstallSnapshot(firstAvailable);
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public void run() throws IOException {
-    boolean installSnapshotRequired;
     for(; isRunning(); mayWait()) {
-      installSnapshotRequired = false;
-
       //HB period is expired OR we have messages OR follower is behind with commit index
       if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
-
-        if (installSnapshotEnabled) {
-          SnapshotInfo snapshot = shouldInstallSnapshot();
-          if (snapshot != null) {
-            installSnapshot(snapshot);
-            installSnapshotRequired = true;
-          }
-        } else {
-          TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
-          if (installSnapshotNotificationTermIndex != null) {
-            installSnapshot(installSnapshotNotificationTermIndex);
-            installSnapshotRequired = true;
-          }
-        }
-
-        appendLog(installSnapshotRequired || haveTooManyPendingRequests());
-
+        final boolean installingSnapshot = installSnapshot();
+        appendLog(installingSnapshot || haveTooManyPendingRequests());
       }
       getLeaderState().checkHealth(getFollower());
     }
@@ -403,14 +403,14 @@
       }
 
       try {
-        onNextImpl(reply);
+        onNextImpl(request, reply);
       } catch(Exception t) {
         LOG.error("Failed onNext request=" + request
             + ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t);
       }
     }
 
-    private void onNextImpl(AppendEntriesReplyProto reply) {
+    private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto reply) {
       errCount.set(0);
 
       if (!firstResponseReceived) {
@@ -435,8 +435,9 @@
           break;
         case INCONSISTENCY:
           grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
-          LOG.warn("{}: received {} reply with nextIndex {}", this, reply.getResult(), reply.getNextIndex());
-          updateNextIndex(Math.max(getFollower().getMatchIndex() + 1, reply.getNextIndex()));
+          LOG.warn("{}: received {} reply with nextIndex {}, request={}",
+              this, reply.getResult(), reply.getNextIndex(), request);
+          updateNextIndex(getNextIndexForInconsistency(request.getFirstIndex(), reply.getNextIndex()));
           break;
         default:
           throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
@@ -662,7 +663,7 @@
       snapshotRequestObserver.onCompleted();
       grpcServerMetrics.onInstallSnapshot();
     } catch (Exception e) {
-      LOG.warn("{}: failed to install snapshot {}: {}", this, snapshot.getFiles(), e);
+      LOG.warn(this + ": failed to installSnapshot " + snapshot, e);
       if (snapshotRequestObserver != null) {
         snapshotRequestObserver.onError(e);
       }
@@ -684,17 +685,17 @@
   }
 
   /**
-   * Send installSnapshot request to Follower with only a notification that a snapshot needs to be installed.
-   * @param firstAvailableLogTermIndex the first available log's index on the Leader
+   * Send an installSnapshot notification request to the Follower.
+   * @param firstAvailable the first available log's index on the Leader
    */
-  private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
-    LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, notify follower to install snapshot-{}",
-        this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), firstAvailableLogTermIndex);
+  private void notifyInstallSnapshot(TermIndex firstAvailable) {
+    LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}",
+        this, firstAvailable, getFollower().getNextIndex());
 
     final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
     StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
     // prepare and enqueue the notify install snapshot request.
-    final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+    final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable);
     if (LOG.isInfoEnabled()) {
       LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
     }
@@ -770,6 +771,7 @@
     private final TermIndex previousLog;
     private final int entriesCount;
 
+    private final TermIndex firstEntry;
     private final TermIndex lastEntry;
 
     private volatile Timestamp sendTime;
@@ -778,6 +780,7 @@
       this.callId = proto.getServerRequest().getCallId();
       this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
       this.entriesCount = proto.getEntriesCount();
+      this.firstEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(0)): null;
       this.lastEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(entriesCount - 1)): null;
 
       this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), isHeartbeat());
@@ -792,7 +795,11 @@
       return previousLog;
     }
 
-    public Timestamp getSendTime() {
+    long getFirstIndex() {
+      return Optional.ofNullable(firstEntry).map(TermIndex::getIndex).orElse(RaftLog.INVALID_LOG_INDEX);
+    }
+
+    Timestamp getSendTime() {
       return sendTime;
     }
 
@@ -811,10 +818,13 @@
 
     @Override
     public String toString() {
+      final String entries = entriesCount == 0? ""
+          : entriesCount == 1? ",entry=" + firstEntry
+          : ",entries=" + firstEntry + "..." + lastEntry;
       return JavaUtils.getClassSimpleName(getClass())
           + ":cid=" + callId
           + ",entriesCount=" + entriesCount
-          + ",lastEntry=" + lastEntry;
+          + entries;
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index c2ec88a..f2be8c6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -25,6 +25,8 @@
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.util.Preconditions;
 
 import java.util.Collection;
 import java.util.List;
@@ -90,24 +92,22 @@
   static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId,
       long currentTerm, InstallSnapshotResult result, long installedSnapshotIndex) {
-    final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
-        replyId, isSuccess(result));
-    final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
-        .newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result);
-    if (installedSnapshotIndex > 0) {
-      builder.setSnapshotIndex(installedSnapshotIndex);
-    }
-    return builder.build();
+    final boolean success = isSuccess(result);
+    Preconditions.assertTrue(success || installedSnapshotIndex == RaftLog.INVALID_LOG_INDEX,
+        () -> "result=" + result + " but installedSnapshotIndex=" + installedSnapshotIndex);
+    final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId, replyId, success);
+    return InstallSnapshotReplyProto.newBuilder()
+        .setServerReply(rb)
+        .setTerm(currentTerm)
+        .setResult(result)
+        .setSnapshotIndex(installedSnapshotIndex > 0? installedSnapshotIndex: 0)
+        .build();
   }
 
   static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId,
-      InstallSnapshotResult result) {
-    final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
-        replyId, isSuccess(result));
-    final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
-        .newBuilder().setServerReply(rb).setResult(result);
-    return builder.build();
+      long currentTerm, InstallSnapshotResult result) {
+    return toInstallSnapshotReplyProto(requestorId, replyId, currentTerm, result, RaftLog.INVALID_LOG_INDEX);
   }
 
   static ReadIndexRequestProto toReadIndexRequestProto(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 334ef30..9794314 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -143,7 +143,7 @@
 
     // There is a mismatch between configurations on leader and follower.
     final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto(
-        leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
+        leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH);
     LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
         getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
         leaderId, request.hasSnapshotChunk(), server.getId(), installSnapshotEnabled);
@@ -209,7 +209,7 @@
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
-            currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
+            currentTerm, InstallSnapshotResult.NOT_LEADER);
         LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
         return reply;
       }
@@ -308,7 +308,7 @@
         server.getStateMachine().event().notifySnapshotInstalled(
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
-            currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
+            currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
       }
 
       // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
@@ -335,7 +335,7 @@
             InstallSnapshotResult.IN_PROGRESS);
       }
       return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
-          currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
+          currentTerm, InstallSnapshotResult.IN_PROGRESS);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index d57c46a..c685f2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -181,6 +181,22 @@
     return null;
   }
 
+  protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) {
+    long next = replyNextIndex;
+    final long i = getFollower().getMatchIndex() + 1;
+    if (i > next && i != requestFirstIndex) {
+      // Ideally, we should set nextIndex to a value greater than matchIndex.
+      // However, we must not resend the same first entry due to some special cases (e.g. the log is empty).
+      // Otherwise, the follower will reply INCONSISTENCY again.
+      next = i;
+    }
+    if (next == requestFirstIndex && next > RaftLog.LEAST_VALID_LOG_INDEX) {
+      // Avoid resending the same first entry.
+      next--;
+    }
+    return next;
+  }
+
   @Override
   public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat)
       throws RaftLogIOException {
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 1efb21b..2d77519 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -49,6 +49,7 @@
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -170,10 +171,18 @@
    */
   @Test
   public void testAddNewFollowers() throws Exception {
-    runWithNewCluster(1, this::testAddNewFollowers);
+    final int numRequests = SNAPSHOT_TRIGGER_THRESHOLD*2 - 1; // trigger a snapshot
+    runWithNewCluster(1, c -> testAddNewFollowers(c, numRequests));
   }
 
-  private void testAddNewFollowers(CLUSTER cluster) throws Exception {
+  @Test
+  public void testAddNewFollowersNoSnapshot() throws Exception {
+    final int numRequests = SNAPSHOT_TRIGGER_THRESHOLD/8;  // do not trigger a snapshot;
+    runWithNewCluster(1, c -> testAddNewFollowers(c, numRequests));
+  }
+
+  private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Exception {
+    final boolean shouldInstallSnapshot = numRequests >= SNAPSHOT_TRIGGER_THRESHOLD;
     leaderSnapshotInfoRef.set(null);
     final List<LogSegmentPath> logs;
     int i = 0;
@@ -182,24 +191,27 @@
       final RaftPeerId leaderId = cluster.getLeader().getId();
 
       try(final RaftClient client = cluster.createClient(leaderId)) {
-        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
-          RaftClientReply
-              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+        for (; i < numRequests; i++) {
+          final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
           Assert.assertTrue(reply.isSuccess());
         }
       }
 
-      // wait for the snapshot to be done
-      final RaftServer.Division leader = cluster.getLeader();
-      final long nextIndex = leader.getRaftLog().getNextIndex();
-      LOG.info("nextIndex = {}", nextIndex);
-      final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
-          nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
-      JavaUtils.attemptRepeatedly(() -> {
-        Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
-        return null;
-      }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
-      logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
+      if (shouldInstallSnapshot) {
+        // wait for the snapshot to be done
+        final RaftServer.Division leader = cluster.getLeader();
+        final long nextIndex = leader.getRaftLog().getNextIndex();
+        LOG.info("nextIndex = {}", nextIndex);
+        final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+            nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+        JavaUtils.attemptRepeatedly(() -> {
+          Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+          return null;
+        }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+        logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
+      } else {
+        logs = Collections.emptyList();
+      }
     } finally {
       cluster.shutdown();
     }
@@ -238,8 +250,9 @@
       // Check the installed snapshot index on each Follower matches with the
       // leader snapshot.
       for (RaftServer.Division follower : cluster.getFollowers()) {
-        Assert.assertEquals(leaderSnapshotInfo.getIndex(),
-            RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+        final long expected = shouldInstallSnapshot ? leaderSnapshotInfo.getIndex() : RaftLog.INVALID_LOG_INDEX;
+        Assert.assertEquals(expected, RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+        RaftSnapshotBaseTest.assertLogContent(follower, false);
       }
 
       // restart the peer and check if it can correctly handle conf change
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 7418dcb..9cce49a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -87,22 +87,28 @@
 
   public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
     final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
-    final RaftLog leaderLog = leader.getRaftLog();
-    final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
-    final LogEntryProto e = leaderLog.get(lastIndex);
+    assertLogContent(leader, true);
+  }
+
+  public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception {
+    final RaftLog log = server.getRaftLog();
+    final long lastIndex = log.getLastEntryTermIndex().getIndex();
+    final LogEntryProto e = log.get(lastIndex);
     Assert.assertTrue(e.hasMetadataEntry());
 
     JavaUtils.attemptRepeatedly(() -> {
-      Assert.assertEquals(leaderLog.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
+      Assert.assertEquals(log.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
       return null;
     }, 50, BaseTest.HUNDRED_MILLIS, "CheckMetadataEntry", LOG);
 
-    SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(leader);
-    Assert.assertTrue("Is not notified as a leader", simpleStateMachine.isNotifiedAsLeader());
+    SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(server);
+    if (isLeader) {
+      Assert.assertTrue("Not notified as a leader", simpleStateMachine.isNotifiedAsLeader());
+    }
     final LogEntryProto[] entries = simpleStateMachine.getContent();
     long message = 0;
     for (int i = 0; i < entries.length; i++) {
-      LOG.info("{}) {} {}", i, message, entries[i]);
+      LOG.info("{}) {} {}", i, message, entries[i].toString().replace("\n", ", "));
       if (entries[i].hasStateMachineLogEntry()) {
         final SimpleMessage m = new SimpleMessage("m" + message++);
         Assert.assertArrayEquals(m.getContent().toByteArray(),