RATIS-1960. Follower may be incorrectly marked as having caught up (#983)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 565cb11..c6983e3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -500,16 +500,25 @@
peersToBootStrap, listenersToBootStrap, new PeerConfiguration(peersInNewConf, listenersInNewConf));
Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers();
Collection<RaftPeer> newListeners = configurationStagingState.getNewListeners();
- // set the staging state
- this.stagingState = configurationStagingState;
+ Collection<RaftPeer> allNew = newListeners.isEmpty()
+ ? newPeers
+ : newPeers.isEmpty()
+ ? newListeners
+ : Stream.concat(newPeers.stream(), newListeners.stream())
+ .collect(Collectors.toList());
- if (newPeers.isEmpty() && newListeners.isEmpty()) {
- applyOldNewConf();
+ if (allNew.isEmpty()) {
+ applyOldNewConf(configurationStagingState);
} else {
// update the LeaderState's sender list
- addAndStartSenders(newPeers);
- addAndStartSenders(newListeners);
+ Collection<LogAppender> newAppenders = addSenders(allNew);
+
+ // set the staging state
+ stagingState = configurationStagingState;
+
+ newAppenders.forEach(LogAppender::start);
}
+
return pending;
}
@@ -579,14 +588,14 @@
notifySenders();
}
- private void applyOldNewConf() {
+ private void applyOldNewConf(ConfigurationStagingState stage) {
final ServerState state = server.getState();
final RaftConfigurationImpl current = state.getRaftConf();
- final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
+ final long nextIndex = state.getLog().getNextIndex();
+ final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current, nextIndex);
// apply the (old, new) configuration to log, and use it as the current conf
appendConfiguration(oldNewConf);
- this.stagingState = null;
notifySenders();
}
@@ -607,7 +616,7 @@
@Override
public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
List<LogEntryProto> entries, TermIndex previous, long callId) {
- final boolean initializing = isCaughtUp(follower);
+ final boolean initializing = !isCaughtUp(follower);
final RaftPeerId targetId = follower.getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
@@ -618,9 +627,13 @@
* Update sender list for setConfiguration request
*/
private void addAndStartSenders(Collection<RaftPeer> newPeers) {
- if (!newPeers.isEmpty()) {
- addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
- }
+ addSenders(newPeers).forEach(LogAppender::start);
+ }
+
+ private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers) {
+ return !newPeers.isEmpty()
+ ? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false)
+ : Collections.emptyList();
}
private RaftPeer getPeer(RaftPeerId id) {
@@ -811,20 +824,22 @@
} else {
final long commitIndex = server.getState().getLog().getLastCommittedIndex();
// check progress for the new followers
- final EnumSet<BootStrapProgress> reports = getLogAppenders()
+ final List<FollowerInfoImpl> laggingFollowers = getLogAppenders()
.map(LogAppender::getFollower)
.filter(follower -> !isCaughtUp(follower))
+ .map(FollowerInfoImpl.class::cast)
+ .collect(Collectors.toList());
+ final EnumSet<BootStrapProgress> reports = laggingFollowers.stream()
.map(follower -> checkProgress(follower, commitIndex))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
stagingState.fail(BootStrapProgress.NOPROGRESS);
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
- applyOldNewConf();
- getLogAppenders()
- .map(LogAppender::getFollower)
+ applyOldNewConf(stagingState);
+ this.stagingState = null;
+ laggingFollowers.stream()
.filter(f -> server.getRaftConf().containsInConf(f.getId()))
- .map(FollowerInfoImpl.class::cast)
.forEach(FollowerInfoImpl::catchUp);
}
}