RATIS-1902. The snapshot index is set incorrectly in InstallSnapshotReplyProto. (#933)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 422adf4..5c07d7f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -153,31 +153,31 @@
return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
}
+ private boolean installSnapshot() {
+ if (installSnapshotEnabled) {
+ final SnapshotInfo snapshot = shouldInstallSnapshot();
+ if (snapshot != null) {
+ installSnapshot(snapshot);
+ return true;
+ }
+ } else {
+ // check installSnapshotNotification
+ final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
+ if (firstAvailable != null) {
+ notifyInstallSnapshot(firstAvailable);
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void run() throws IOException {
- boolean installSnapshotRequired;
for(; isRunning(); mayWait()) {
- installSnapshotRequired = false;
-
//HB period is expired OR we have messages OR follower is behind with commit index
if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
-
- if (installSnapshotEnabled) {
- SnapshotInfo snapshot = shouldInstallSnapshot();
- if (snapshot != null) {
- installSnapshot(snapshot);
- installSnapshotRequired = true;
- }
- } else {
- TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
- if (installSnapshotNotificationTermIndex != null) {
- installSnapshot(installSnapshotNotificationTermIndex);
- installSnapshotRequired = true;
- }
- }
-
- appendLog(installSnapshotRequired || haveTooManyPendingRequests());
-
+ final boolean installingSnapshot = installSnapshot();
+ appendLog(installingSnapshot || haveTooManyPendingRequests());
}
getLeaderState().checkHealth(getFollower());
}
@@ -403,14 +403,14 @@
}
try {
- onNextImpl(reply);
+ onNextImpl(request, reply);
} catch(Exception t) {
LOG.error("Failed onNext request=" + request
+ ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t);
}
}
- private void onNextImpl(AppendEntriesReplyProto reply) {
+ private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto reply) {
errCount.set(0);
if (!firstResponseReceived) {
@@ -435,8 +435,9 @@
break;
case INCONSISTENCY:
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
- LOG.warn("{}: received {} reply with nextIndex {}", this, reply.getResult(), reply.getNextIndex());
- updateNextIndex(Math.max(getFollower().getMatchIndex() + 1, reply.getNextIndex()));
+ LOG.warn("{}: received {} reply with nextIndex {}, request={}",
+ this, reply.getResult(), reply.getNextIndex(), request);
+ updateNextIndex(getNextIndexForInconsistency(request.getFirstIndex(), reply.getNextIndex()));
break;
default:
throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
@@ -662,7 +663,7 @@
snapshotRequestObserver.onCompleted();
grpcServerMetrics.onInstallSnapshot();
} catch (Exception e) {
- LOG.warn("{}: failed to install snapshot {}: {}", this, snapshot.getFiles(), e);
+ LOG.warn(this + ": failed to installSnapshot " + snapshot, e);
if (snapshotRequestObserver != null) {
snapshotRequestObserver.onError(e);
}
@@ -684,17 +685,17 @@
}
/**
- * Send installSnapshot request to Follower with only a notification that a snapshot needs to be installed.
- * @param firstAvailableLogTermIndex the first available log's index on the Leader
+ * Send an installSnapshot notification request to the Follower.
+ * @param firstAvailable the first available log's index on the Leader
*/
- private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
- LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, notify follower to install snapshot-{}",
- this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), firstAvailableLogTermIndex);
+ private void notifyInstallSnapshot(TermIndex firstAvailable) {
+ LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}",
+ this, firstAvailable, getFollower().getNextIndex());
final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
// prepare and enqueue the notify install snapshot request.
- final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
+ final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable);
if (LOG.isInfoEnabled()) {
LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
}
@@ -770,6 +771,7 @@
private final TermIndex previousLog;
private final int entriesCount;
+ private final TermIndex firstEntry;
private final TermIndex lastEntry;
private volatile Timestamp sendTime;
@@ -778,6 +780,7 @@
this.callId = proto.getServerRequest().getCallId();
this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
this.entriesCount = proto.getEntriesCount();
+ this.firstEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(0)): null;
this.lastEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(entriesCount - 1)): null;
this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), isHeartbeat());
@@ -792,7 +795,11 @@
return previousLog;
}
- public Timestamp getSendTime() {
+ long getFirstIndex() {
+ return Optional.ofNullable(firstEntry).map(TermIndex::getIndex).orElse(RaftLog.INVALID_LOG_INDEX);
+ }
+
+ Timestamp getSendTime() {
return sendTime;
}
@@ -811,10 +818,13 @@
@Override
public String toString() {
+ final String entries = entriesCount == 0? ""
+ : entriesCount == 1? ",entry=" + firstEntry
+ : ",entries=" + firstEntry + "..." + lastEntry;
return JavaUtils.getClassSimpleName(getClass())
+ ":cid=" + callId
+ ",entriesCount=" + entriesCount
- + ",lastEntry=" + lastEntry;
+ + entries;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index c2ec88a..f2be8c6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -25,6 +25,8 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.util.Preconditions;
import java.util.Collection;
import java.util.List;
@@ -90,24 +92,22 @@
static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId,
long currentTerm, InstallSnapshotResult result, long installedSnapshotIndex) {
- final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
- replyId, isSuccess(result));
- final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
- .newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result);
- if (installedSnapshotIndex > 0) {
- builder.setSnapshotIndex(installedSnapshotIndex);
- }
- return builder.build();
+ final boolean success = isSuccess(result);
+ Preconditions.assertTrue(success || installedSnapshotIndex == RaftLog.INVALID_LOG_INDEX,
+ () -> "result=" + result + " but installedSnapshotIndex=" + installedSnapshotIndex);
+ final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId, replyId, success);
+ return InstallSnapshotReplyProto.newBuilder()
+ .setServerReply(rb)
+ .setTerm(currentTerm)
+ .setResult(result)
+ .setSnapshotIndex(installedSnapshotIndex > 0? installedSnapshotIndex: 0)
+ .build();
}
static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId,
- InstallSnapshotResult result) {
- final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
- replyId, isSuccess(result));
- final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
- .newBuilder().setServerReply(rb).setResult(result);
- return builder.build();
+ long currentTerm, InstallSnapshotResult result) {
+ return toInstallSnapshotReplyProto(requestorId, replyId, currentTerm, result, RaftLog.INVALID_LOG_INDEX);
}
static ReadIndexRequestProto toReadIndexRequestProto(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 334ef30..9794314 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -143,7 +143,7 @@
// There is a mismatch between configurations on leader and follower.
final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto(
- leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
+ leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
leaderId, request.hasSnapshotChunk(), server.getId(), installSnapshotEnabled);
@@ -209,7 +209,7 @@
currentTerm = state.getCurrentTerm();
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
+ currentTerm, InstallSnapshotResult.NOT_LEADER);
LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
return reply;
}
@@ -308,7 +308,7 @@
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
+ currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
@@ -335,7 +335,7 @@
InstallSnapshotResult.IN_PROGRESS);
}
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
+ currentTerm, InstallSnapshotResult.IN_PROGRESS);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index d57c46a..c685f2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -181,6 +181,22 @@
return null;
}
+ protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) {
+ long next = replyNextIndex;
+ final long i = getFollower().getMatchIndex() + 1;
+ if (i > next && i != requestFirstIndex) {
+ // Ideally, we should set nextIndex to a value greater than matchIndex.
+ // However, we must not resend the same first entry due to some special cases (e.g. the log is empty).
+ // Otherwise, the follower will reply INCONSISTENCY again.
+ next = i;
+ }
+ if (next == requestFirstIndex && next > RaftLog.LEAST_VALID_LOG_INDEX) {
+ // Avoid resending the same first entry.
+ next--;
+ }
+ return next;
+ }
+
@Override
public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 1efb21b..2d77519 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -49,6 +49,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -170,10 +171,18 @@
*/
@Test
public void testAddNewFollowers() throws Exception {
- runWithNewCluster(1, this::testAddNewFollowers);
+ final int numRequests = SNAPSHOT_TRIGGER_THRESHOLD*2 - 1; // trigger a snapshot
+ runWithNewCluster(1, c -> testAddNewFollowers(c, numRequests));
}
- private void testAddNewFollowers(CLUSTER cluster) throws Exception {
+ @Test
+ public void testAddNewFollowersNoSnapshot() throws Exception {
+ final int numRequests = SNAPSHOT_TRIGGER_THRESHOLD/8; // do not trigger a snapshot;
+ runWithNewCluster(1, c -> testAddNewFollowers(c, numRequests));
+ }
+
+ private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Exception {
+ final boolean shouldInstallSnapshot = numRequests >= SNAPSHOT_TRIGGER_THRESHOLD;
leaderSnapshotInfoRef.set(null);
final List<LogSegmentPath> logs;
int i = 0;
@@ -182,24 +191,27 @@
final RaftPeerId leaderId = cluster.getLeader().getId();
try(final RaftClient client = cluster.createClient(leaderId)) {
- for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply
- reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+ for (; i < numRequests; i++) {
+ final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
- // wait for the snapshot to be done
- final RaftServer.Division leader = cluster.getLeader();
- final long nextIndex = leader.getRaftLog().getNextIndex();
- LOG.info("nextIndex = {}", nextIndex);
- final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
- nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
- JavaUtils.attemptRepeatedly(() -> {
- Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
- return null;
- }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
- logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
+ if (shouldInstallSnapshot) {
+ // wait for the snapshot to be done
+ final RaftServer.Division leader = cluster.getLeader();
+ final long nextIndex = leader.getRaftLog().getNextIndex();
+ LOG.info("nextIndex = {}", nextIndex);
+ final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+ nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
+ JavaUtils.attemptRepeatedly(() -> {
+ Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+ return null;
+ }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+ logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
+ } else {
+ logs = Collections.emptyList();
+ }
} finally {
cluster.shutdown();
}
@@ -238,8 +250,9 @@
// Check the installed snapshot index on each Follower matches with the
// leader snapshot.
for (RaftServer.Division follower : cluster.getFollowers()) {
- Assert.assertEquals(leaderSnapshotInfo.getIndex(),
- RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+ final long expected = shouldInstallSnapshot ? leaderSnapshotInfo.getIndex() : RaftLog.INVALID_LOG_INDEX;
+ Assert.assertEquals(expected, RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
+ RaftSnapshotBaseTest.assertLogContent(follower, false);
}
// restart the peer and check if it can correctly handle conf change
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 7418dcb..9cce49a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -87,22 +87,28 @@
public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
- final RaftLog leaderLog = leader.getRaftLog();
- final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
- final LogEntryProto e = leaderLog.get(lastIndex);
+ assertLogContent(leader, true);
+ }
+
+ public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception {
+ final RaftLog log = server.getRaftLog();
+ final long lastIndex = log.getLastEntryTermIndex().getIndex();
+ final LogEntryProto e = log.get(lastIndex);
Assert.assertTrue(e.hasMetadataEntry());
JavaUtils.attemptRepeatedly(() -> {
- Assert.assertEquals(leaderLog.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
+ Assert.assertEquals(log.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
return null;
}, 50, BaseTest.HUNDRED_MILLIS, "CheckMetadataEntry", LOG);
- SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(leader);
- Assert.assertTrue("Is not notified as a leader", simpleStateMachine.isNotifiedAsLeader());
+ SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(server);
+ if (isLeader) {
+ Assert.assertTrue("Not notified as a leader", simpleStateMachine.isNotifiedAsLeader());
+ }
final LogEntryProto[] entries = simpleStateMachine.getContent();
long message = 0;
for (int i = 0; i < entries.length; i++) {
- LOG.info("{}) {} {}", i, message, entries[i]);
+ LOG.info("{}) {} {}", i, message, entries[i].toString().replace("\n", ", "));
if (entries[i].hasStateMachineLogEntry()) {
final SimpleMessage m = new SimpleMessage("m" + message++);
Assert.assertArrayEquals(m.getContent().toByteArray(),