package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol {
public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class);
private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName();
static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
private final RaftServerProxy proxy;
private final StateMachine stateMachine;
private final int minTimeoutMs;
private final int maxTimeoutMs;
private final int rpcSlownessTimeoutMs;
private final int sleepDeviationThresholdMs;
private final boolean installSnapshotEnabled;
private final LifeCycle lifeCycle;
private final ServerState state;
private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() -> new RaftPeer(getId(), getServerRpc().getInetSocketAddress()));
private final RoleInfo role;
private final RetryCache retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final RaftServerJmxAdapter jmxAdapter;
private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
final RaftPeerId id = proxy.getId();"{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
this.lifeCycle = new LifeCycle(id);
this.stateMachine = stateMachine;
this.role = new RoleInfo(id);
final RaftProperties properties = proxy.getProperties();
minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
sleepDeviationThresholdMs = RaftServerConfigKeys.sleepDeviationThreshold(properties);
installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
"max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
this.proxy = proxy;
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = initRetryCache(properties);
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
this.jmxAdapter = new RaftServerJmxAdapter();
private RetryCache initRetryCache(RaftProperties prop) {
final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop);
return new RetryCache(expireTime);
LogAppender newLogAppender(
LeaderState state, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
boolean attendVote) {
final FollowerInfo f = new FollowerInfo(getMemberId(), peer, lastRpcTime, nextIndex, attendVote,
return getProxy().getFactory().newLogAppender(this, state, f);
RaftPeer getPeer() {
return peerSupplier.get();
int getMinTimeoutMs() {
return minTimeoutMs;
int getMaxTimeoutMs() {
return maxTimeoutMs;
int getRandomTimeoutMs() {
return minTimeoutMs + ThreadLocalRandom.current().nextInt(
maxTimeoutMs - minTimeoutMs + 1);
int getSleepDeviationThresholdMs() {
return sleepDeviationThresholdMs;
public RaftGroupId getGroupId() {
return getMemberId().getGroupId();
public StateMachine getStateMachine() {
return stateMachine;
public RetryCache getRetryCache() {
return retryCache;
public RaftServerProxy getProxy() {
return proxy;
public RaftServerRpc getServerRpc() {
return proxy.getServerRpc();
private void setRole(RaftPeerRole newRole, Object reason) {"{}: changes role from {} to {} at term {} for {}",
getMemberId(), this.role, newRole, state.getCurrentTerm(), reason);
boolean start() {
if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
return false;
RaftConfiguration conf = getRaftConf();
if (conf != null && conf.contains(getId())) {"{}: start as a follower, conf={}", getMemberId(), conf);
} else {"{}: start with initializing state, conf={}", getMemberId(), conf);
registerMBean(getId(), getGroupId(), jmxAdapter, jmxAdapter);
return true;
static boolean registerMBean(
RaftPeerId id, RaftGroupId groupdId, RaftServerMXBean mBean, JmxRegister jmx) {
final String prefix = "Ratis:service=RaftServer,group=" + groupdId + ",id=";
final String registered = jmx.register(mBean, Arrays.asList(
() -> prefix + id,
() -> prefix + ObjectName.quote(id.toString())));
return registered != null;
* The peer belongs to the current configuration, should start as a follower
private void startAsFollower() {
setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
* The peer does not have any configuration (maybe it will later be included
* in some configuration). Start still as a follower but will not vote or
* start election.
private void startInitializing() {
setRole(RaftPeerRole.FOLLOWER, "startInitializing");
// do not start FollowerState
public ServerState getState() {
return state;
public RaftGroupMemberId getMemberId() {
return getState().getMemberId();
public RaftPeerId getId() {
return getMemberId().getPeerId();
RoleInfo getRole() {
return role;
RaftConfiguration getRaftConf() {
return getState().getRaftConf();
RaftGroup getGroup() {
return RaftGroup.valueOf(getGroupId(), getRaftConf().getPeers());
public void shutdown(boolean deleteDirectory) {
lifeCycle.checkStateAndClose(() -> {"{}: shutdown", getMemberId());
try {
} catch (Exception ignored) {
LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), ignored);
try {
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), ignored);
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), ignored);
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), ignored);
} catch (Exception ignored) {
LOG.warn("{}: Failed to close state", getMemberId(), ignored);
if (deleteDirectory) {
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
try {
} catch(Exception ignored) {
LOG.warn("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, ignored);
public boolean isAlive() {
return !lifeCycle.getCurrentState().isClosingOrClosed();
public boolean isFollower() {
return role.isFollower();
public boolean isCandidate() {
return role.isCandidate();
public boolean isLeader() {
return role.isLeader();
* Change the server state to Follower if this server is in a different role or force is true.
* @param newTerm The new term.
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) {
final RaftPeerRole old = role.getCurrentRole();
final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
if (old != RaftPeerRole.FOLLOWER || force) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
} else if (old == RaftPeerRole.CANDIDATE) {
} else if (old == RaftPeerRole.FOLLOWER) {
return metadataUpdated;
synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException {
if (changeToFollower(newTerm, false, reason)) {
synchronized void changeToLeader() {
setRole(RaftPeerRole.LEADER, "changeToLeader");
// start sending AppendEntries RPC to followers
final LogEntryProto e = role.startLeaderState(this, getProxy().getProperties());
Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
infos.add(commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex()));
// add the commit infos of other servers
if (isLeader()) {
leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
} else {
.filter(id -> !id.equals(getId()))
return infos;
GroupInfoReply getGroupInfo(GroupInfoRequest request) {
return new GroupInfoReply(request, getRoleInfoProto(),
state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup());
public RoleInfoProto getRoleInfoProto() {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
switch (currentRole) {
CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
role.getFollowerState().ifPresent(fs -> {
final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
getRaftConf().getPeer(state.getLeaderId()), fs.getLastRpcTime().elapsedTimeMs());
case LEADER:
role.getLeaderState().ifPresent(ls -> {
final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
ls.getLogAppenders().map(LogAppender::getFollower).forEach(f ->
f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs())));
throw new IllegalStateException("incorrect role of server " + currentRole);
synchronized void changeToCandidate() {
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
if (state.shouldNotifyExtendedNoLeader()) {
// start election
public String toString() {
return role + " " + state + " " + lifeCycle.getCurrentState();
* @return null if the server is in leader state.
private CompletableFuture<RaftClientReply> checkLeaderState(
RaftClientRequest request, RetryCache.CacheEntry entry) {
try {
assertGroup(request.getRequestorId(), request.getRaftGroupId());
} catch (GroupMismatchException e) {
return RetryCache.failWithException(e, entry);
if (!isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = new RaftClientReply(request, exception, getCommitInfos());
return RetryCache.failWithReply(reply, entry);
final LeaderState leaderState = role.getLeaderState().orElse(null);
if (leaderState == null || !leaderState.isReady()) {
RetryCache.CacheEntry cacheEntry = retryCache.get(request.getClientId(), request.getCallId());
if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
return cacheEntry.getReplyFuture();
final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId());
final RaftClientReply reply = new RaftClientReply(request, lnre, getCommitInfos());
return RetryCache.failWithReply(reply, entry);
return null;
NotLeaderException generateNotLeaderException() {
if (lifeCycle.getCurrentState() != RUNNING) {
return new NotLeaderException(getMemberId(), null, null);
RaftPeerId leaderId = state.getLeaderId();
if (leaderId == null || leaderId.equals(getId())) {
// No idea about who is the current leader. Or the peer is the current
// leader, but it is about to step down. set the suggested leader as null.
leaderId = null;
RaftConfiguration conf = getRaftConf();
Collection<RaftPeer> peers = conf.getPeers();
return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), peers);
private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException {
return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
getMemberId() + " is not in " + Arrays.toString(expected) + ": current state is " + c),
void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
final RaftGroupId groupId = getGroupId();
if (!groupId.equals(requestorGroupId)) {
throw new GroupMismatchException(getMemberId()
+ ": The group (" + requestorGroupId + ") of " + requestorId
+ " does not match the group (" + groupId + ") of the server " + getId());
* Handle a normal update request from client.
private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContext context,
RetryCache.CacheEntry cacheEntry) throws IOException {
CompletableFuture<RaftClientReply> reply;
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, cacheEntry);
if (reply != null) {
return reply;
// append the message to its local log
final LeaderState leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest();
if (permit == null) {
return JavaUtils.completeExceptionally(new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request));
try {
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
RaftClientReply exceptionReply = new RaftClientReply(request, e, getCommitInfos());
// leader will step down here
if (isLeader()) {
return CompletableFuture.completedFuture(exceptionReply);
// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
return JavaUtils.completeExceptionally(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request));
return pending.getFuture();
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
LOG.debug("{}: receive client request({})", getMemberId(), request);
if ( {
return staleReadAsync(request);
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
// let the state machine handle read-only request from client
final StateMachine stateMachine = getStateMachine();
if ( {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
return processQueryFuture(stateMachine.query(request.getMessage()), request);
if ( {
return watchAsync(request);
// query the retry cache
RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
request.getClientId(), request.getCallId());
if (previousResult.isRetry()) {
// if the previous attempt is still pending or it succeeded, return its
// future
return previousResult.getEntry().getReplyFuture();
final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
TransactionContext context = stateMachine.startTransaction(request);
if (context.getException() != null) {
RaftClientReply exceptionReply = new RaftClientReply(request,
new StateMachineException(getMemberId(), context.getException()), getCommitInfos());
return CompletableFuture.completedFuture(exceptionReply);
return appendTransaction(request, context, cacheEntry);
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
.orElseGet(() -> CompletableFuture.completedFuture(
new RaftClientReply(request, generateNotLeaderException(), getCommitInfos())));
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
final long minIndex = request.getType().getStaleRead().getMinIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex, commitIndex);
if (commitIndex < minIndex) {
final StaleReadException e = new StaleReadException(
"Unable to serve stale-read due to server commit index = " + commitIndex + " < min = " + minIndex);
return CompletableFuture.completedFuture(
new RaftClientReply(request, new StateMachineException(getMemberId(), e), getCommitInfos()));
return processQueryFuture(getStateMachine().queryStale(request.getMessage(), minIndex), request);
CompletableFuture<RaftClientReply> processQueryFuture(
CompletableFuture<Message> queryFuture, RaftClientRequest request) {
return queryFuture.thenApply(r -> new RaftClientReply(request, r, getCommitInfos()))
.exceptionally(e -> {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof StateMachineException) {
return new RaftClientReply(request, (StateMachineException)e, getCommitInfos());
throw new CompletionException(e);
public RaftClientReply submitClientRequest(RaftClientRequest request)
throws IOException {
return waitForReply(request, submitClientRequestAsync(request));
RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftClientReply> future)
throws IOException {
return waitForReply(getMemberId(), request, future, e -> new RaftClientReply(request, e, getCommitInfos()));
static <REPLY extends RaftClientReply> REPLY waitForReply(
Object id, RaftClientRequest request, CompletableFuture<REPLY> future,
Function<RaftException, REPLY> exceptionReply)
throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
final String s = id + ": Interrupted when waiting for reply, request=" + request;, e);
throw IOUtils.toInterruptedIOException(s, e);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause == null) {
throw new IOException(e);
if (cause instanceof NotLeaderException ||
cause instanceof StateMachineException) {
final REPLY reply = exceptionReply.apply((RaftException) cause);
if (reply != null) {
return reply;
throw IOUtils.asIOException(cause);
public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
return waitForReply(request, setConfigurationAsync(request));
* Handle a raft configuration change request from client.
public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException {"{}: receive setConfiguration {}", getMemberId(), request);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, null);
if (reply != null) {
return reply;
final RaftConfiguration current = getRaftConf();
final LeaderState leaderState = role.getLeaderStateNonNull();
// make sure there is no other raft reconfiguration in progress
if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
throw new ReconfigurationInProgressException(
"Reconfiguration is already in progress: " + current);
// return success with a null message if the new conf is the same as the current
if (current.hasNoChange(peersInNewConf)) {
pending = new PendingRequest(request);
pending.setReply(new RaftClientReply(request, getCommitInfos()));
return pending.getFuture();
// add new peers into the rpc service
// add staging state into the leaderState
pending = leaderState.startSetConfiguration(request);
return pending.getFuture();
private boolean shouldWithholdVotes(long candidateTerm) {
if (state.getCurrentTerm() < candidateTerm) {
return false;
} else if (isLeader()) {
return true;
} else {
// following a leader and not yet timeout
return isFollower() && state.hasLeader()
&& role.getFollowerState().map(FollowerState::shouldWithholdVotes).orElse(false);
* check if the remote peer is not included in the current conf
* and should shutdown. should shutdown if all the following stands:
* 1. this is a leader
* 2. current conf is stable and has been committed
* 3. candidate id is not included in conf
* 4. candidate's last entry's index < conf's index
private boolean shouldSendShutdown(RaftPeerId candidateId,
TermIndex candidateLastEntry) {
return isLeader()
&& getRaftConf().isStable()
&& getState().isConfCommitted()
&& !getRaftConf().containsInConf(candidateId)
&& candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
&& role.getLeaderState().map(ls -> !ls.isBootStrappingPeer(candidateId)).orElse(false);
public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
throws IOException {
final RaftRpcRequestProto request = r.getServerRequest();
return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
private RequestVoteReplyProto requestVote(
RaftPeerId candidateId, RaftGroupId candidateGroupId,
long candidateTerm, TermIndex candidateLastEntry) throws IOException {
CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
candidateId, candidateTerm, candidateLastEntry);
LOG.debug("{}: receive requestVote({}, {}, {}, {})",
getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
assertGroup(candidateId, candidateGroupId);
boolean voteGranted = false;
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
synchronized (this) {
final FollowerState fs = role.getFollowerState().orElse(null);
if (shouldWithholdVotes(candidateTerm)) {"{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
getMemberId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
} else if (state.recognizeCandidate(candidateId, candidateTerm)) {
final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
// see Section 5.4.1 Election restriction
if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
voteGranted = true;
if (termUpdated || voteGranted) {
state.persistMetadata(); // sync metafile
if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) {
shouldShutdown = true;
reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
voteGranted, state.getCurrentTerm(), shouldShutdown);
if (LOG.isDebugEnabled()) {
LOG.debug("{} replies to vote request: {}. Peer's state: {}",
getMemberId(), ServerProtoUtils.toString(reply), state);
return reply;
private void validateEntries(long expectedTerm, TermIndex previous,
LogEntryProto... entries) {
if (entries != null && entries.length > 0) {
final long index0 = entries[0].getIndex();
if (previous == null || previous.getTerm() == 0) {
Preconditions.assertTrue(index0 == 0,
"Unexpected Index: previous is null but entries[%s].getIndex()=%s",
0, index0);
} else {
Preconditions.assertTrue(previous.getIndex() == index0 - 1,
"Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
previous, 0, index0);
for (int i = 0; i < entries.length; i++) {
final long t = entries[i].getTerm();
Preconditions.assertTrue(expectedTerm >= t,
"Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
i, t, expectedTerm);
final long indexi = entries[i].getIndex();
Preconditions.assertTrue(indexi == index0 + i,
"Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
i, indexi, index0);
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
throws IOException {
try {
return appendEntriesAsync(r).join();
} catch (CompletionException e) {
throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e));
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto r)
throws IOException {
// TODO avoid converting list to array
final RaftRpcRequestProto request = r.getServerRequest();
final LogEntryProto[] entries = r.getEntriesList()
.toArray(new LogEntryProto[r.getEntriesCount()]);
final TermIndex previous = r.hasPreviousLog() ?
ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
previous, r.getLeaderCommit(), r.getInitializing(), entries);
try {
return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries);
} catch(Throwable t) {
LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
throw t;
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
if (isHeartbeat) {
if (LOG.isTraceEnabled()) {
LOG.trace("HEARTBEAT: " + message.get());
} else {
if (LOG.isDebugEnabled()) {
private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) {
final Optional<FollowerState> fs = role.getFollowerState();
if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) {
return fs;
} else {
return Optional.empty();
private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException {
CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING);
if (currentState == STARTING) {
if (role.getCurrentRole() == null) {
throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
assertGroup(leaderId, leaderGroupId);
try {
validateEntries(leaderTerm, previous, entries);
} catch (IllegalArgumentException e) {
throw new IOException(e);
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
final boolean isHeartbeat = entries.length == 0;
() -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
+ previous + ", " + leaderCommit + ", " + initializing
+ ", commits" + ProtoUtils.toString(commitInfos)
+ ", entries: " + ServerProtoUtils.toString(entries));
final List<CompletableFuture<Long>> futures;
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), NOT_LEADER, callId);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
getMemberId(), leaderId, leaderTerm, state, ServerProtoUtils.toString(reply));
return CompletableFuture.completedFuture(reply);
try {
changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries");
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
state.setLeader(leaderId, "appendEntries");
if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
// Check that the append entries are not inconsistent. There are 3
// scenarios which can result in inconsistency:
// 1. There is a snapshot installation in progress
// 2. There is an overlap between the snapshot index and the entries
// 3. There is a gap between the local log and the entries
// In any of these scenarios, we should return an INCONSISTENCY reply
// back to leader so that the leader can update this follower's next index.
AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries(
leaderId, currentTerm, followerCommit, previous, callId, entries);
if (inconsistencyReply != null) {
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return CompletableFuture.completedFuture(inconsistencyReply);
futures = state.getLog().append(entries);
if (!isHeartbeat) {
CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
return JavaUtils.allOf(futures).whenCompleteAsync(
(r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
state.updateStatemachine(leaderCommit, currentTerm);
final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm,
state.getLog().getLastCommittedIndex(), n, SUCCESS, callId);
logAppendEntries(isHeartbeat, () ->
getMemberId() + ": succeeded to handle AppendEntries. Reply: " + ServerProtoUtils.toString(reply));
return reply;
private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm,
long followerCommit, TermIndex previous, long callId, LogEntryProto... entries) {
final long replyNextIndex = checkInconsistentAppendEntries(previous, entries);
if (replyNextIndex == -1) {
return null;
final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, INCONSISTENCY, callId);"{}: inconsistency entries. Reply:{}", getMemberId(), ServerProtoUtils.toString(reply));
return reply;
private long checkInconsistentAppendEntries(TermIndex previous, LogEntryProto... entries) {
// Check if a snapshot installation through state machine is in progress.
final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
if (installSnapshot != null) {"{}: Failed appendEntries as snapshot ({}) installation is in progress", getMemberId(), installSnapshot);
return installSnapshot.getIndex();
// Check that the first log entry is greater than the snapshot index in the latest snapshot.
// If not, reply to the leader the new next index.
if (entries != null && entries.length > 0) {
final long firstEntryIndex = entries[0].getIndex();
final long snapshotIndex = state.getSnapshotIndex();
if (snapshotIndex > 0 && snapshotIndex >= firstEntryIndex) {"{}: Failed appendEntries as latest snapshot ({}) already has the append entries (first index: {})",
getMemberId(), snapshotIndex, firstEntryIndex);
return snapshotIndex + 1;
// Check if "previous" is contained in current state.
if (previous != null && !state.containsTermIndex(previous)) {
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());"{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
return replyNextIndex;
return -1;
public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
if (LOG.isInfoEnabled()) {"{}: receive installSnapshot: {}", getMemberId(), ServerProtoUtils.toString(request));
final InstallSnapshotReplyProto reply;
try {
reply = installSnapshotImpl(request);
} catch (Throwable t) {
LOG.error("{}: installSnapshot failed", getMemberId(), t);
throw t;
if (LOG.isInfoEnabled()) {"{}: reply installSnapshot: {}", getMemberId(), ServerProtoUtils.toString(reply));
return reply;
private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(),
leaderId, request);
assertLifeCycleState(STARTING, RUNNING);
assertGroup(leaderId, leaderGroupId);
// Check if install snapshot from Leader is enabled
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
if (request.hasSnapshotChunk()) {
return checkAndInstallSnapshot(request, leaderId);
} else {
// Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
if (request.hasNotification()) {
return notifyStateMachineToInstallSnapshot(request, leaderId);
// There is a mismatch between configurations on leader and follower.
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(
leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled);
return reply;
private InstallSnapshotReplyProto checkAndInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastTermIndex.getIndex();
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId());
return reply;
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
try {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
// have a lot of requests
state.getLog().getNextIndex() <= lastIncludedIndex,
"%s log's next id is %s, last included index in snapshot is %s",
getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex);
//TODO: We should only update State with installed snapshot once the request is done.
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncludedIndex, leaderTerm);
} finally {
if (snapshotChunkRequest.getDone()) {"{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = ServerProtoUtils.toTermIndex(
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
return reply;
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
if (inProgressInstallSnapshotRequest.compareAndSet(null, firstAvailableLogTermIndex)) {
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
long snapshotIndex = state.getSnapshotIndex();
if (snapshotIndex + 1 >= firstAvailableLogIndex) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(
leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);"{}: StateMachine snapshotIndex is {}", getMemberId(), snapshotIndex);
return reply;
// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot."{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
stateMachine.notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("{}: State Machine failed to install snapshot", getMemberId(), exception);
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
if (reply != null) {
state.reloadStateMachine(reply.getIndex(), leaderTerm);
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SUCCESS, -1);
}"{}: StateMachine installSnapshot is in progress: {}",
getMemberId(), inProgressInstallSnapshotRequest.get());
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
RaftPeerId targetId, String requestId, int requestIndex,
SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
OptionalLong totalSize = snapshot.getFiles().stream()
assert totalSize.isPresent();
return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(), targetId,
requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
chunks, totalSize.getAsLong(), done);
synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
RaftPeerId targetId, TermIndex firstAvailableLogTermIndex) {
assert (firstAvailableLogTermIndex.getIndex() > 0);
return ServerProtoUtils.toInstallSnapshotRequestProto(getMemberId(), targetId,
state.getCurrentTerm(), firstAvailableLogTermIndex);
synchronized RequestVoteRequestProto createRequestVoteRequest(
RaftPeerId targetId, long term, TermIndex lastEntry) {
return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId, term, lastEntry);
public void submitUpdateCommitEvent() {
* The log has been submitted to the state machine. Use the future to update
* the pending requests and retry cache.
* @param logEntry the log entry that has been submitted to the state machine
* @param stateMachineFuture the future returned by the state machine
* from which we will get transaction result later
private CompletableFuture<Message> replyPendingRequest(
LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) {
final StateMachineLogEntryProto smLog = logEntry.getStateMachineLogEntry();
// update the retry cache
final ClientId clientId = ClientId.valueOf(smLog.getClientId());
final long callId = smLog.getCallId();
final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(clientId, callId);
if (cacheEntry.isFailed()) {
retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
final long logIndex = logEntry.getIndex();
return stateMachineFuture.whenComplete((reply, exception) -> {
final RaftClientReply r;
if (exception == null) {
r = new RaftClientReply(clientId, getMemberId(), callId, true, reply, null, logIndex, getCommitInfos());
} else {
// the exception is coming from the state machine. wrap it into the
// reply as a StateMachineException
final StateMachineException e = new StateMachineException(getMemberId(), exception);
r = new RaftClientReply(clientId, getMemberId(), callId, false, null, e, logIndex, getCommitInfos());
// update pending request
synchronized (RaftServerImpl.this) {
final LeaderState leaderState = role.getLeaderState().orElse(null);
if (isLeader() && leaderState != null) { // is leader and is running
leaderState.replyPendingRequest(logIndex, r);
public long[] getFollowerNextIndices() {
if (!isLeader()) {
return null;
return role.getLeaderState().map(LeaderState::getFollowerNextIndices).orElse(null);
CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
final StateMachine stateMachine = getStateMachine();
if (!next.hasStateMachineLogEntry()) {
stateMachine.notifyIndexUpdate(next.getTerm(), next.getIndex());
if (next.hasConfigurationEntry()) {
// the reply should have already been set. only need to record
// the new conf in the metadata file.
} else if (next.hasStateMachineLogEntry()) {
// check whether there is a TransactionContext because we are the leader.
TransactionContext trx = role.getLeaderState()
.map(leader -> leader.getTransactionContext(next.getIndex())).orElseGet(
() -> TransactionContext.newBuilder()
// Let the StateMachine inject logic for committed transactions in sequential order.
trx = stateMachine.applyTransactionSerial(trx);
try {
// TODO: This step can be parallelized
CompletableFuture<Message> stateMachineFuture =
return replyPendingRequest(next, stateMachineFuture);
} catch (Throwable e) {
LOG.error("{}: applyTransaction failed for index:{} proto:{}",
getMemberId(), next.getIndex(), ServerProtoUtils.toString(next), e);
throw e;
return null;
* The given log entry is being truncated.
* Fail the corresponding client request, if there is any.
* @param logEntry the log entry being truncated
public void notifyTruncatedLogEntry(LogEntryProto logEntry) {
if (logEntry.hasStateMachineLogEntry()) {
final StateMachineLogEntryProto smLog = logEntry.getStateMachineLogEntry();
final ClientId clientId = ClientId.valueOf(smLog.getClientId());
final long callId = smLog.getCallId();
final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, callId);
if (cacheEntry != null) {
final RaftClientReply reply = new RaftClientReply(clientId, getMemberId(),
callId, false, null, generateNotLeaderException(),
logEntry.getIndex(), getCommitInfos());
private class RaftServerJmxAdapter extends JmxRegister implements RaftServerMXBean {
public String getId() {
return getMemberId().getPeerId().toString();
public String getLeaderId() {
return getState().getLeaderId().toString();
public long getCurrentTerm() {
return getState().getCurrentTerm();
public String getGroupId() {
return RaftServerImpl.this.getGroupId().toString();
public String getRole() {
return role.toString();
public List<String> getFollowers() {
return role.getLeaderState().map(LeaderState::getFollowers).orElse(Collections.emptyList())