RATIS-2024. Refactor appendEntries code. (#1040)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c47db14..7390093 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -108,6 +108,11 @@
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedSupplier;
+import static org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto;
+import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
+import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
+
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol {
@@ -1483,18 +1488,24 @@
ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
final AppendEntriesRequestProto r = requestRef.retain();
final RaftRpcRequestProto request = r.getServerRequest();
- final List<LogEntryProto> entries = r.getEntriesList();
final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null;
- final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
-
try {
- preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
- previous, r.getLeaderCommit(), r.getInitializing(), entries);
- return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
- request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef);
+ final RaftPeerId leaderId = RaftPeerId.valueOf(request.getRequestorId());
+ final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(request.getRaftGroupId());
+
+ CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, previous, r);
+
+ assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
+ if (!startComplete.get()) {
+ throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
+ }
+ assertGroup(leaderId, leaderGroupId);
+ validateEntries(r.getLeaderTerm(), previous, r.getEntriesList());
+
+ return appendEntriesAsync(leaderId, request.getCallId(), previous, requestRef);
} catch(Exception t) {
- LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
- throw t;
+ LOG.error("{}: Failed appendEntries* {}", getMemberId(), toAppendEntriesRequestString(r), t);
+ throw IOUtils.asIOException(t);
} finally {
requestRef.release();
}
@@ -1540,24 +1551,6 @@
}
}
- private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
- TermIndex previous, long leaderCommit, boolean initializing, List<LogEntryProto> entries) throws IOException {
- CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
- leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
-
- assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
- if (!startComplete.get()) {
- throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
- }
- assertGroup(leaderId, leaderGroupId);
-
- try {
- validateEntries(leaderTerm, previous, entries);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- }
- }
-
private long updateCommitInfoCache() {
return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
}
@@ -1566,19 +1559,15 @@
return serverExecutor;
}
- @SuppressWarnings("checkstyle:parameternumber")
- private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
- RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
- List<CommitInfoProto> commitInfos, List<LogEntryProto> entries,
- ReferenceCountedObject<?> requestRef) throws IOException {
+ private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId leaderId, long callId,
+ TermIndex previous, ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
+ final AppendEntriesRequestProto proto = requestRef.get();
+ final List<LogEntryProto> entries = proto.getEntriesList();
final boolean isHeartbeat = entries.isEmpty();
- logAppendEntries(isHeartbeat,
- () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
- + previous + ", " + leaderCommit + ", " + initializing
- + ", commits:" + ProtoUtils.toString(commitInfos)
- + ", cId:" + callId
- + ", entries: " + LogProtoUtils.toLogEntriesString(entries));
+ logAppendEntries(isHeartbeat, () -> getMemberId() + ": appendEntries* "
+ + toAppendEntriesRequestString(proto));
+ final long leaderTerm = proto.getLeaderTerm();
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
@@ -1586,17 +1575,12 @@
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
+ final boolean recognized = state.recognizeLeader(Op.APPEND_ENTRIES, leaderId, leaderTerm);
if (!recognized) {
- final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
+ return CompletableFuture.completedFuture(toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
- AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
- getMemberId(), leaderId, leaderTerm, state, ServerStringUtils.toAppendEntriesReplyString(reply));
- }
- return CompletableFuture.completedFuture(reply);
+ AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
}
try {
changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
@@ -1605,7 +1589,7 @@
}
state.setLeader(leaderId, "appendEntries");
- if (!initializing && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
+ if (!proto.getInitializing() && lifeCycle.compareAndTransition(State.STARTING, State.RUNNING)) {
role.startFollowerState(this, Op.APPEND_ENTRIES);
}
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
@@ -1617,12 +1601,14 @@
// 3. There is a gap between the local log and the entries
// In any of these scenarios, we should return an INCONSISTENCY reply
// back to leader so that the leader can update this follower's next index.
-
- AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries(
- leaderId, currentTerm, followerCommit, previous, callId, isHeartbeat, entries);
- if (inconsistencyReply != null) {
+ final long inconsistencyReplyNextIndex = checkInconsistentAppendEntries(previous, entries);
+ if (inconsistencyReplyNextIndex > RaftLog.INVALID_LOG_INDEX) {
+ final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(
+ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextIndex,
+ AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
+ LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
- return CompletableFuture.completedFuture(inconsistencyReply);
+ return CompletableFuture.completedFuture(reply);
}
state.updateConfiguration(entries);
@@ -1631,7 +1617,7 @@
final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
: state.getLog().append(requestRef.delegate(entries));
- commitInfos.forEach(commitInfoCache::update);
+ proto.getCommitInfosList().forEach(commitInfoCache::update);
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
if (!isHeartbeat) {
@@ -1641,49 +1627,27 @@
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
}
}
- return JavaUtils.allOf(futures).whenCompleteAsync(
- (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
- serverExecutor
- ).thenApply(v -> {
- final AppendEntriesReplyProto reply;
- synchronized(this) {
- final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.size());
- state.updateCommitIndex(commitIndex, currentTerm, false);
+
+ final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
+ final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
+ return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
+ followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
+ timer.stop();
+ }, getServerExecutor()).thenApply(v -> {
+ final boolean updated = state.updateCommitIndex(commitIndex, currentTerm, false);
+ if (updated) {
updateCommitInfoCache();
- final long n;
- final long matchIndex;
- if (!isHeartbeat) {
- LogEntryProto requestLastEntry = entries.get(entries.size() - 1);
- n = requestLastEntry.getIndex() + 1;
- matchIndex = requestLastEntry.getIndex();
- } else {
- n = state.getLog().getNextIndex();
- matchIndex = RaftLog.INVALID_LOG_INDEX;
- }
- reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm,
- state.getLog().getLastCommittedIndex(), n, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
}
- logAppendEntries(isHeartbeat, () -> getMemberId() + ": succeeded to handle AppendEntries. Reply: "
- + ServerStringUtils.toAppendEntriesReplyString(reply));
- timer.stop(); // TODO: future never completes exceptionally?
+ final long nextIndex = isHeartbeat? state.getNextIndex(): matchIndex + 1;
+ final AppendEntriesReplyProto reply = toAppendEntriesReplyProto(leaderId, getMemberId(),
+ currentTerm, updated? commitIndex : state.getLog().getLastCommittedIndex(),
+ nextIndex, AppendResult.SUCCESS, callId, matchIndex, isHeartbeat);
+ logAppendEntries(isHeartbeat, () -> getMemberId()
+ + ": appendEntries* reply " + toAppendEntriesReplyString(reply));
return reply;
});
}
- private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm,
- long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, List<LogEntryProto> entries) {
- final long replyNextIndex = checkInconsistentAppendEntries(previous, entries);
- if (replyNextIndex == -1) {
- return null;
- }
-
- final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
- AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
- LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), ServerStringUtils.toAppendEntriesReplyString(reply));
- return reply;
- }
-
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
// Check if a snapshot installation through state machine is in progress.
final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
@@ -1714,7 +1678,7 @@
return replyNextIndex;
}
- return -1;
+ return RaftLog.INVALID_LOG_INDEX;
}
@Override
@@ -1762,30 +1726,25 @@
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
- final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
-
CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);
- LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},",
- getMemberId(), leaderId, request.getLeaderLastEntry());
+ if (!request.hasLeaderLastEntry()) {
+ // It should have a leaderLastEntry since there is a placeHolder entry.
+ LOG.warn("{}: leaderLastEntry is missing in {}", getMemberId(), request);
+ return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ }
+
+ final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
+ LOG.debug("{}: receive startLeaderElection from {} with lastEntry {}", getMemberId(), leaderId, leaderLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(leaderId, leaderGroupId);
synchronized (this) {
- // leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry
- // so leader at each term should has at least one entry
- if (leaderLastEntry == null) {
- LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId());
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
- }
-
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
- final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm());
+ final boolean recognized = state.recognizeLeader("startLeaderElection", leaderId, leaderLastEntry.getTerm());
if (!recognized) {
- LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}",
- getMemberId(), leaderId, leaderLastEntry.getTerm(), state);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e21f63c..d02994e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -321,24 +321,21 @@
Objects.requireNonNull(operation.getLogEntry());
}
- /**
- * Check if accept the leader selfId and term from the incoming AppendEntries rpc.
- * If accept, update the current state.
- * @return true if the check passes
- */
- boolean recognizeLeader(RaftPeerId peerLeaderId, long leaderTerm) {
+ /** @return true iff the given peer id is recognized as the leader. */
+ boolean recognizeLeader(Object op, RaftPeerId peerId, long peerTerm) {
final long current = currentTerm.get();
- if (leaderTerm < current) {
+ if (peerTerm < current) {
+ LOG.warn("{}: Failed to recognize {} as leader for {} since peerTerm = {} < currentTerm = {}",
+ getMemberId(), peerId, op, peerTerm, current);
return false;
}
final RaftPeerId curLeaderId = getLeaderId();
- if (leaderTerm > current || curLeaderId == null) {
- // If the request indicates a term that is greater than the current term
- // or no leader has been set for the current term, make sure to update
- // leader and term later
- return true;
+ if (peerTerm == current && curLeaderId != null && !curLeaderId.equals(peerId)) {
+ LOG.warn("{}: Failed to recognize {} as leader for {} since current leader is {} (peerTerm = currentTerm = {})",
+ getMemberId(), peerId, op, curLeaderId, current);
+ return false;
}
- return curLeaderId.equals(peerLeaderId);
+ return true;
}
static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 9794314..7aae944 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -32,6 +32,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
@@ -49,6 +50,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toInstallSnapshotReplyProto;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto;
import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
class SnapshotInstallationHandler {
@@ -142,7 +145,7 @@
}
// There is a mismatch between configurations on leader and follower.
- final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto(
+ final InstallSnapshotReplyProto failedReply = toInstallSnapshotReplyProto(
leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
@@ -158,13 +161,11 @@
final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
synchronized (server) {
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+ final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
- final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
- LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId());
- return reply;
}
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
@@ -193,7 +194,7 @@
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
}
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
}
@@ -205,13 +206,11 @@
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
synchronized (server) {
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+ final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
- final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER);
- LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
- return reply;
}
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
@@ -229,7 +228,7 @@
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}
@@ -307,7 +306,7 @@
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
}
@@ -325,7 +324,7 @@
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
installedIndex.set(latestInstalledIndex);
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
}
@@ -334,7 +333,7 @@
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
}
}
@@ -342,7 +341,7 @@
private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
final RoleInfo role = server.getRole();
final Optional<FollowerState> fs = role.getFollowerState();
- final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(leader,
+ final ServerRpcProto leaderInfo = toServerRpcProto(leader,
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder()
.setLeaderInfo(leaderInfo)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
index 25223c0..284664d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
@@ -21,16 +21,19 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.util.ProtoUtils;
+import java.util.List;
+
/**
* This class provides convenient utilities for converting Protocol Buffers messages to strings.
* The output strings are for information purpose only.
* They are concise and compact compared to the Protocol Buffers implementations of {@link Object#toString()}.
- *
+ * <p>
* The output messages or the output formats may be changed without notice.
* Callers of this class should not try to parse the output strings for any purposes.
* Instead, they should use the public APIs provided by Protocol Buffers.
@@ -42,12 +45,13 @@
if (request == null) {
return null;
}
+ final List<LogEntryProto> entries = request.getEntriesList();
return ProtoUtils.toString(request.getServerRequest())
+ "-t" + request.getLeaderTerm()
+ ",previous=" + TermIndex.valueOf(request.getPreviousLog())
+ ",leaderCommit=" + request.getLeaderCommit()
+ ",initializing? " + request.getInitializing()
- + ",entries: " + LogProtoUtils.toLogEntriesShortString(request.getEntriesList());
+ + "," + (entries.isEmpty()? "HEARTBEAT" : "entries: " + LogProtoUtils.toLogEntriesShortString(entries));
}
public static String toAppendEntriesReplyString(AppendEntriesReplyProto reply) {