RATIS-1960. Follower may be incorrectly marked as having caught up (#983)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 565cb11..c6983e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -500,16 +500,25 @@
         peersToBootStrap, listenersToBootStrap, new PeerConfiguration(peersInNewConf, listenersInNewConf));
     Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers();
     Collection<RaftPeer> newListeners = configurationStagingState.getNewListeners();
-    // set the staging state
-    this.stagingState = configurationStagingState;
+    Collection<RaftPeer> allNew = newListeners.isEmpty()
+        ? newPeers
+        : newPeers.isEmpty()
+            ? newListeners
+            : Stream.concat(newPeers.stream(), newListeners.stream())
+                .collect(Collectors.toList());
 
-    if (newPeers.isEmpty() && newListeners.isEmpty()) {
-      applyOldNewConf();
+    if (allNew.isEmpty()) {
+      applyOldNewConf(configurationStagingState);
     } else {
       // update the LeaderState's sender list
-      addAndStartSenders(newPeers);
-      addAndStartSenders(newListeners);
+      Collection<LogAppender> newAppenders = addSenders(allNew);
+
+      // set the staging state
+      stagingState = configurationStagingState;
+
+      newAppenders.forEach(LogAppender::start);
     }
+
     return pending;
   }
 
@@ -579,14 +588,14 @@
     notifySenders();
   }
 
-  private void applyOldNewConf() {
+  private void applyOldNewConf(ConfigurationStagingState stage) {
     final ServerState state = server.getState();
     final RaftConfigurationImpl current = state.getRaftConf();
-    final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
+    final long nextIndex = state.getLog().getNextIndex();
+    final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current, nextIndex);
     // apply the (old, new) configuration to log, and use it as the current conf
     appendConfiguration(oldNewConf);
 
-    this.stagingState = null;
     notifySenders();
   }
 
@@ -607,7 +616,7 @@
   @Override
   public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
       List<LogEntryProto> entries, TermIndex previous, long callId) {
-    final boolean initializing = isCaughtUp(follower);
+    final boolean initializing = !isCaughtUp(follower);
     final RaftPeerId targetId = follower.getId();
     return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
         ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
@@ -618,9 +627,13 @@
    * Update sender list for setConfiguration request
    */
   private void addAndStartSenders(Collection<RaftPeer> newPeers) {
-    if (!newPeers.isEmpty()) {
-      addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
-    }
+    addSenders(newPeers).forEach(LogAppender::start);
+  }
+
+  private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers) {
+    return !newPeers.isEmpty()
+        ? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false)
+        : Collections.emptyList();
   }
 
   private RaftPeer getPeer(RaftPeerId id) {
@@ -811,20 +824,22 @@
     } else {
       final long commitIndex = server.getState().getLog().getLastCommittedIndex();
       // check progress for the new followers
-      final EnumSet<BootStrapProgress> reports = getLogAppenders()
+      final List<FollowerInfoImpl> laggingFollowers = getLogAppenders()
           .map(LogAppender::getFollower)
           .filter(follower -> !isCaughtUp(follower))
+          .map(FollowerInfoImpl.class::cast)
+          .collect(Collectors.toList());
+      final EnumSet<BootStrapProgress> reports = laggingFollowers.stream()
           .map(follower -> checkProgress(follower, commitIndex))
           .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
       if (reports.contains(BootStrapProgress.NOPROGRESS)) {
         stagingState.fail(BootStrapProgress.NOPROGRESS);
       } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
         // all caught up!
-        applyOldNewConf();
-        getLogAppenders()
-            .map(LogAppender::getFollower)
+        applyOldNewConf(stagingState);
+        this.stagingState = null;
+        laggingFollowers.stream()
             .filter(f -> server.getRaftConf().containsInConf(f.getId()))
-            .map(FollowerInfoImpl.class::cast)
             .forEach(FollowerInfoImpl::catchUp);
       }
     }