blob: 080925d29729a32ba5a46ef3af10632639539dfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.*;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.PAUSED;
import static org.apache.ratis.util.LifeCycle.State.PAUSING;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
import com.codahale.metrics.Timer;
import org.apache.ratis.util.function.CheckedSupplier;
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol{
private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection";
class Info implements DivisionInfo {
@Override
public RaftPeerRole getCurrentRole() {
return getRole().getCurrentRole();
}
@Override
public boolean isLeaderReady() {
return isLeader() && getRole().isLeaderReady();
}
@Override
public LifeCycle.State getLifeCycleState() {
return lifeCycle.getCurrentState();
}
@Override
public RoleInfoProto getRoleInfoProto() {
return RaftServerImpl.this.getRoleInfoProto();
}
@Override
public long getCurrentTerm() {
return getState().getCurrentTerm();
}
@Override
public long getLastAppliedIndex() {
return getState().getLastAppliedIndex();
}
@Override
public long[] getFollowerNextIndices() {
return role.getLeaderState()
.filter(leader -> isLeader())
.map(LeaderStateImpl::getFollowerNextIndices)
.orElse(null);
}
}
private final RaftServerProxy proxy;
private final StateMachine stateMachine;
private final Info info = new Info();
private final DivisionProperties divisionProperties;
private final int maxTimeoutMs;
private final TimeDuration leaderStepDownWaitTime;
private final TimeDuration sleepDeviationThreshold;
private final boolean installSnapshotEnabled;
private final LifeCycle lifeCycle;
private final ServerState state;
private final RoleInfo role;
private final DataStreamMap dataStreamMap;
private final MemoizedSupplier<RaftClient> raftClient;
private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final RaftServerJmxAdapter jmxAdapter;
private final LeaderElectionMetrics leaderElectionMetrics;
private final RaftServerMetricsImpl raftServerMetrics;
private final AtomicLong inProgressInstallSnapshotRequest;
private final AtomicLong installedSnapshotIndex;
private final AtomicBoolean isSnapshotNull;
// To avoid append entry before complete start() method
// For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry
// request, and change state to RUNNING by lifeCycle.compareAndTransition(STARTING, RUNNING),
// then thread1 execute lifeCycle.transition(RUNNING) in startAsFollower(),
// So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING,
private final AtomicBoolean startComplete;
private final TransferLeadership transferLeadership;
private final SnapshotManagementRequestHandler snapshotRequestHandler;
private final ExecutorService serverExecutor;
private final ExecutorService clientExecutor;
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
this.lifeCycle = new LifeCycle(id);
this.stateMachine = stateMachine;
this.role = new RoleInfo(id);
final RaftProperties properties = proxy.getProperties();
this.divisionProperties = new DivisionPropertiesImpl(properties);
maxTimeoutMs = properties().maxRpcTimeoutMs();
leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.proxy = proxy;
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = new RetryCacheImpl(properties);
this.inProgressInstallSnapshotRequest = new AtomicLong();
this.installedSnapshotIndex = new AtomicLong();
this.isSnapshotNull = new AtomicBoolean(false);
this.dataStreamMap = new DataStreamMapImpl(id);
this.jmxAdapter = new RaftServerJmxAdapter();
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
this.startComplete = new AtomicBoolean(false);
this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
.setRaftGroup(group)
.setProperties(getRaftServer().getProperties())
.build());
this.transferLeadership = new TransferLeadership(this);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
RaftServerConfigKeys.ThreadPool.serverSize(properties),
id + "-server");
this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.clientCached(properties),
RaftServerConfigKeys.ThreadPool.clientSize(properties),
id + "-client");
}
@Override
public DivisionProperties properties() {
return divisionProperties;
}
LogAppender newLogAppender(LeaderState leaderState, FollowerInfo f) {
return getRaftServer().getFactory().newLogAppender(this, leaderState, f);
}
int getMaxTimeoutMs() {
return maxTimeoutMs;
}
TimeDuration getRandomElectionTimeout() {
final int min = properties().minRpcTimeoutMs();
final int millis = min + ThreadLocalRandom.current().nextInt(properties().maxRpcTimeoutMs() - min + 1);
return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
}
TimeDuration getLeaderStepDownWaitTime() {
return leaderStepDownWaitTime;
}
TimeDuration getSleepDeviationThreshold() {
return sleepDeviationThreshold;
}
@Override
public StateMachine getStateMachine() {
return stateMachine;
}
@Override
public RaftLog getRaftLog() {
return getState().getLog();
}
@Override
public RaftStorage getRaftStorage() {
return getState().getStorage();
}
@Override
public DataStreamMap getDataStreamMap() {
return dataStreamMap;
}
@Override
public RaftClient getRaftClient() {
return raftClient.get();
}
@Override
public RetryCacheImpl getRetryCache() {
return retryCache;
}
@Override
public RaftServerProxy getRaftServer() {
return proxy;
}
RaftServerRpc getServerRpc() {
return proxy.getServerRpc();
}
private void setRole(RaftPeerRole newRole, Object reason) {
LOG.info("{}: changes role from {} to {} at term {} for {}",
getMemberId(), this.role, newRole, state.getCurrentTerm(), reason);
this.role.transitionRole(newRole);
}
boolean start() {
if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
return false;
}
final RaftConfigurationImpl conf = getRaftConf();
if (conf != null && conf.containsInBothConfs(getId())) {
LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
startAsFollower();
} else {
LOG.info("{}: start with initializing state, conf={}", getMemberId(), conf);
startInitializing();
}
registerMBean(getId(), getMemberId().getGroupId(), jmxAdapter, jmxAdapter);
state.start();
startComplete.compareAndSet(false, true);
return true;
}
static boolean registerMBean(
RaftPeerId id, RaftGroupId groupdId, RaftServerMXBean mBean, JmxRegister jmx) {
final String prefix = "Ratis:service=RaftServer,group=" + groupdId + ",id=";
final String registered = jmx.register(mBean, Arrays.asList(
() -> prefix + id,
() -> prefix + ObjectName.quote(id.toString())));
return registered != null;
}
/**
* The peer belongs to the current configuration, should start as a follower
*/
private void startAsFollower() {
setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
role.startFollowerState(this, "startAsFollower");
lifeCycle.transition(RUNNING);
}
/**
* The peer does not have any configuration (maybe it will later be included
* in some configuration). Start still as a follower but will not vote or
* start election.
*/
private void startInitializing() {
setRole(RaftPeerRole.FOLLOWER, "startInitializing");
// do not start FollowerState
}
ServerState getState() {
return state;
}
@Override
public RaftGroupMemberId getMemberId() {
return getState().getMemberId();
}
@Override
public DivisionInfo getInfo() {
return info;
}
RoleInfo getRole() {
return role;
}
@Override
public RaftConfigurationImpl getRaftConf() {
return getState().getRaftConf();
}
/**
* This removes the group from the server.
* If the deleteDirectory flag is set to false, and renameDirectory
* the directory is moved to
* {@link RaftServerConfigKeys#REMOVED_GROUPS_DIR_KEY} location.
* If the deleteDirectory flag is true, the group is permanently deleted.
*/
void groupRemove(boolean deleteDirectory, boolean renameDirectory) {
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
/* Shutdown is triggered here inorder to avoid any locked files. */
close();
getStateMachine().event().notifyGroupRemove();
if (deleteDirectory) {
for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) {
try {
FileUtils.deleteFully(dir.getRoot());
LOG.info("{}: Succeed to remove RaftStorageDirectory {}", getMemberId(), dir);
break;
} catch (NoSuchFileException e) {
LOG.warn("{}: Some file does not exist {}", getMemberId(), dir, e);
} catch (Exception ignored) {
LOG.error("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, ignored);
break;
}
}
} else if(renameDirectory) {
try {
/* Create path with current group in REMOVED_GROUPS_DIR_KEY location */
File toBeRemovedGroupFolder = new File(RaftServerConfigKeys
.removedGroupsDir(proxy.getProperties()),
dir.getRoot().getName());
FileUtils.moveDirectory(dir.getRoot().toPath(),
toBeRemovedGroupFolder.toPath());
LOG.info("{}: Group {} is renamed successfully", getMemberId(), getGroup());
} catch (IOException e) {
LOG.warn("{}: Failed to remove group {}", getMemberId(),
dir.getRoot().getName(), e);
}
}
}
@Override
public void close() {
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: shutdown", getMemberId());
try {
jmxAdapter.unregister();
} catch (Exception ignored) {
LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), ignored);
}
try {
role.shutdownFollowerState();
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), ignored);
}
try{
role.shutdownLeaderElection();
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), ignored);
}
try{
role.shutdownLeaderState(true);
} catch (Exception ignored) {
LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), ignored);
}
try{
state.close();
} catch (Exception ignored) {
LOG.warn("{}: Failed to close state", getMemberId(), ignored);
}
try {
leaderElectionMetrics.unregister();
raftServerMetrics.unregister();
RaftServerMetricsImpl.removeRaftServerMetrics(getMemberId());
} catch (Exception ignored) {
LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
}
try {
if (raftClient.isInitialized()) {
raftClient.get().close();
}
} catch (Exception ignored) {
LOG.warn("{}: Failed to close raft client", getMemberId(), ignored);
}
try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception ignored) {
LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", ignored);
}
try {
ConcurrentUtils.shutdownAndWait(serverExecutor);
} catch (Exception ignored) {
LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", ignored);
}
});
}
/**
* Change the server state to Follower if this server is in a different role or force is true.
* @param newTerm The new term.
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) {
final RaftPeerRole old = role.getCurrentRole();
final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
if (old != RaftPeerRole.FOLLOWER || force) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
role.shutdownLeaderState(false);
} else if (old == RaftPeerRole.CANDIDATE) {
role.shutdownLeaderElection();
} else if (old == RaftPeerRole.FOLLOWER) {
role.shutdownFollowerState();
}
role.startFollowerState(this, reason);
}
return metadataUpdated;
}
synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException {
if (changeToFollower(newTerm, false, reason)) {
state.persistMetadata();
}
}
synchronized void changeToLeader() {
Preconditions.assertTrue(getInfo().isCandidate());
role.shutdownLeaderElection();
setRole(RaftPeerRole.LEADER, "changeToLeader");
state.becomeLeader();
// start sending AppendEntries RPC to followers
final LogEntryProto e = role.startLeaderState(this);
getState().setRaftConf(e);
}
@Override
public Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
infos.add(updateCommitInfoCache());
// add the commit infos of other servers
if (getInfo().isLeader()) {
role.getLeaderState().ifPresent(
leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
} else {
getRaftConf().getAllPeers().stream()
.map(RaftPeer::getId)
.filter(id -> !id.equals(getId()))
.map(commitInfoCache::get)
.filter(Objects::nonNull)
.forEach(infos::add);
}
return infos;
}
GroupInfoReply getGroupInfo(GroupInfoRequest request) {
return new GroupInfoReply(request, getCommitInfos(),
getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
}
private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
.setSelf(getPeer().getRaftPeerProto())
.setRole(currentRole)
.setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
final Optional<FollowerState> fs = role.getFollowerState();
final ServerRpcProto leaderInfo =
ServerProtoUtils.toServerRpcProto(leaderPeerInfo,
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo)
.setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
return roleInfo.build();
}
RoleInfoProto getRoleInfoProto() {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
.setSelf(getPeer().getRaftPeerProto())
.setRole(currentRole)
.setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
switch (currentRole) {
case CANDIDATE:
CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
.setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs());
roleInfo.setCandidateInfo(candidate);
break;
case FOLLOWER:
final Optional<FollowerState> fs = role.getFollowerState();
final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
getRaftConf().getPeer(state.getLeaderId()),
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
// FollowerState can be null while adding a new peer as it is not
// a voting member yet
roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
.setLeaderInfo(leaderInfo)
.setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
break;
case LEADER:
role.getLeaderState().ifPresent(ls -> {
final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
ls.getLogAppenders().map(LogAppender::getFollower).forEach(f ->
leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto(
f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs())));
leader.setTerm(ls.getCurrentTerm());
roleInfo.setLeaderInfo(leader);
});
break;
default:
throw new IllegalStateException("incorrect role of server " + currentRole);
}
return roleInfo.build();
}
synchronized void changeToCandidate(boolean forceStartLeaderElection) {
Preconditions.assertTrue(getInfo().isFollower());
role.shutdownFollowerState();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
if (state.shouldNotifyExtendedNoLeader()) {
stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto());
}
// start election
role.startLeaderElection(this, forceStartLeaderElection);
}
@Override
public String toString() {
return role + " " + state + " " + lifeCycle.getCurrentState();
}
RaftClientReply.Builder newReplyBuilder(RaftClientRequest request) {
return RaftClientReply.newBuilder()
.setRequest(request)
.setCommitInfos(getCommitInfos());
}
private RaftClientReply.Builder newReplyBuilder(ClientInvocationId invocationId, long logIndex) {
return RaftClientReply.newBuilder()
.setClientInvocationId(invocationId)
.setLogIndex(logIndex)
.setServerId(getMemberId())
.setCommitInfos(getCommitInfos());
}
RaftClientReply newSuccessReply(RaftClientRequest request) {
return newReplyBuilder(request)
.setSuccess()
.build();
}
RaftClientReply newSuccessReply(RaftClientRequest request, long logIndex) {
return newReplyBuilder(request)
.setSuccess()
.setLogIndex(logIndex)
.build();
}
RaftClientReply newExceptionReply(RaftClientRequest request, RaftException exception) {
return newReplyBuilder(request)
.setException(exception)
.build();
}
/**
* @return null if the server is in leader state.
*/
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry,
boolean isWrite) {
try {
assertGroup(request.getRequestorId(), request.getRaftGroupId());
} catch (GroupMismatchException e) {
return RetryCacheImpl.failWithException(e, entry);
}
if (!getInfo().isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, exception);
return RetryCacheImpl.failWithReply(reply, entry);
}
if (!getInfo().isLeaderReady()) {
final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request));
if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
return cacheEntry.getReplyFuture();
}
final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId());
final RaftClientReply reply = newExceptionReply(request, lnre);
return RetryCacheImpl.failWithReply(reply, entry);
}
if (isWrite && isSteppingDown()) {
final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down");
final RaftClientReply reply = newExceptionReply(request, lsde);
return RetryCacheImpl.failWithReply(reply, entry);
}
return null;
}
NotLeaderException generateNotLeaderException() {
if (lifeCycle.getCurrentState() != RUNNING) {
return new NotLeaderException(getMemberId(), null, null);
}
RaftPeerId leaderId = state.getLeaderId();
if (leaderId == null || leaderId.equals(getId())) {
// No idea about who is the current leader. Or the peer is the current
// leader, but it is about to step down. set the suggested leader as null.
leaderId = null;
}
final RaftConfigurationImpl conf = getRaftConf();
Collection<RaftPeer> peers = conf.getAllPeers();
return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), peers);
}
private LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyException {
return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
getMemberId() + " is not in " + expected + ": current state is " + c),
expected);
}
void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException {
final RaftGroupId groupId = getMemberId().getGroupId();
if (!groupId.equals(requestorGroupId)) {
throw new GroupMismatchException(getMemberId()
+ ": The group (" + requestorGroupId + ") of " + requestorId
+ " does not match the group (" + groupId + ") of the server " + getId());
}
}
/**
* Handle a normal update request from client.
*/
private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
CompletableFuture<RaftClientReply> reply;
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, cacheEntry, true);
if (reply != null) {
return reply;
}
// append the message to its local log
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request));
return cacheEntry.getReplyFuture();
}
try {
state.appendLog(context);
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
}
return CompletableFuture.completedFuture(exceptionReply);
}
// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request));
return cacheEntry.getReplyFuture();
}
leaderState.notifySenders();
}
return pending.getFuture();
}
void stepDownOnJvmPause() {
role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE));
}
private RaftClientRequest filterDataStreamRaftClientRequest(RaftClientRequest request)
throws InvalidProtocolBufferException {
return !request.is(TypeCase.FORWARD) ? request : ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(
request.getMessage().getContent().asReadOnlyByteBuffer()));
}
<REPLY> CompletableFuture<REPLY> executeSubmitServerRequestAsync(
CheckedSupplier<CompletableFuture<REPLY>, IOException> submitFunction) {
return CompletableFuture.supplyAsync(
() -> JavaUtils.callAsUnchecked(submitFunction, CompletionException::new),
serverExecutor).join();
}
CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientRequest request) {
return CompletableFuture.supplyAsync(
() -> JavaUtils.callAsUnchecked(() -> submitClientRequestAsync(request), CompletionException::new),
clientExecutor).join();
}
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Optional<Timer> timer = Optional.ofNullable(raftServerMetrics.getClientRequestTimer(request.getType()));
final CompletableFuture<RaftClientReply> replyFuture;
if (request.is(TypeCase.STALEREAD)) {
replyFuture = staleReadAsync(request);
} else {
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null,
!request.is(TypeCase.READ) && !request.is(TypeCase.WATCH));
if (reply != null) {
return reply;
}
// let the state machine handle read-only request from client
RaftClientRequest.Type type = request.getType();
if (type.is(TypeCase.MESSAGESTREAM)) {
if (type.getMessageStream().getEndOfRequest()) {
final CompletableFuture<RaftClientRequest> f = streamEndOfRequestAsync(request);
if (f.isCompletedExceptionally()) {
return f.thenApply(r -> null);
}
request = f.join();
type = request.getType();
}
}
if (type.is(TypeCase.READ)) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
replyFuture = processQueryFuture(stateMachine.query(request.getMessage()), request);
} else if (type.is(TypeCase.WATCH)) {
replyFuture = watchAsync(request);
} else if (type.is(TypeCase.MESSAGESTREAM)) {
replyFuture = streamAsync(request);
} else {
// query the retry cache
final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request));
final CacheEntry cacheEntry = queryResult.getEntry();
if (queryResult.isRetry()) {
// if the previous attempt is still pending or it succeeded, return its
// future
replyFuture = cacheEntry.getReplyFuture();
} else {
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
TransactionContext context = stateMachine.startTransaction(filterDataStreamRaftClientRequest(request));
if (context.getException() != null) {
final StateMachineException e = new StateMachineException(getMemberId(), context.getException());
final RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
replyFuture = CompletableFuture.completedFuture(exceptionReply);
} else {
replyFuture = appendTransaction(request, context, cacheEntry);
}
}
}
}
final RaftClientRequest.Type type = request.getType();
replyFuture.whenComplete((clientReply, exception) -> {
if (clientReply.isSuccess()) {
timer.map(Timer::time).ifPresent(Timer.Context::stop);
}
if (exception != null || clientReply.getException() != null) {
raftServerMetrics.incFailedRequestCount(type);
}
});
return replyFuture;
}
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
.orElseGet(() -> CompletableFuture.completedFuture(
newExceptionReply(request, generateNotLeaderException())));
}
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
final long minIndex = request.getType().getStaleRead().getMinIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex, commitIndex);
if (commitIndex < minIndex) {
final StaleReadException e = new StaleReadException(
"Unable to serve stale-read due to server commit index = " + commitIndex + " < min = " + minIndex);
return CompletableFuture.completedFuture(
newExceptionReply(request, new StateMachineException(getMemberId(), e)));
}
return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request);
}
private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.streamAsync(request))
.orElseGet(() -> CompletableFuture.completedFuture(
newExceptionReply(request, generateNotLeaderException())));
}
private CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.streamEndOfRequestAsync(request))
.orElse(null);
}
CompletableFuture<RaftClientReply> processQueryFuture(
CompletableFuture<Message> queryFuture, RaftClientRequest request) {
return queryFuture.thenApply(r -> newReplyBuilder(request).setSuccess().setMessage(r).build())
.exceptionally(e -> {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof StateMachineException) {
return newExceptionReply(request, (StateMachineException)e);
}
throw new CompletionException(e);
});
}
@Override
public RaftClientReply submitClientRequest(RaftClientRequest request)
throws IOException {
return waitForReply(request, submitClientRequestAsync(request));
}
RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftClientReply> future)
throws IOException {
return waitForReply(getMemberId(), request, future, e -> newExceptionReply(request, e));
}
static <REPLY extends RaftClientReply> REPLY waitForReply(
Object id, RaftClientRequest request, CompletableFuture<REPLY> future,
Function<RaftException, REPLY> exceptionReply)
throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
final String s = id + ": Interrupted when waiting for reply, request=" + request;
LOG.info(s, e);
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(s, e);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause == null) {
throw new IOException(e);
}
if (cause instanceof NotLeaderException ||
cause instanceof StateMachineException) {
final REPLY reply = exceptionReply.apply((RaftException) cause);
if (reply != null) {
return reply;
}
}
throw IOUtils.asIOException(cause);
}
}
public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
return waitForReply(request, transferLeadershipAsync(request));
}
private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
TransferLeadershipRequest request, String msg) {
LOG.warn(msg);
return CompletableFuture.completedFuture(
newExceptionReply(request, new TransferLeadershipException(msg)));
}
boolean isSteppingDown() {
return transferLeadership.isSteppingDown();
}
void finishTransferLeadership() {
transferLeadership.finish(state.getLeaderId(), false);
}
public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
throws IOException {
LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
synchronized (this) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
if (reply != null) {
return reply;
}
if (getId().equals(request.getNewLeader())) {
return CompletableFuture.completedFuture(newSuccessReply(request));
}
final RaftConfigurationImpl conf = getRaftConf();
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
// make sure there is no raft reconfiguration in progress
if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
" when raft reconfiguration in progress.";
return logAndReturnTransferLeadershipFail(request, msg);
}
if (!conf.containsInConf(request.getNewLeader())) {
String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
" as it is not in " + conf;
return logAndReturnTransferLeadershipFail(request, msg);
}
if (!conf.isHighestPriority(request.getNewLeader())) {
String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
" as it does not has highest priority " + conf;
return logAndReturnTransferLeadershipFail(request, msg);
}
return transferLeadership.start(request);
}
}
CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
//TODO(liuyaolong): get the gap value from shell command
long minGapValue = RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties());
final long lastSnapshotIndex = Optional.ofNullable(stateMachine.getLatestSnapshot())
.map(SnapshotInfo::getIndex)
.orElse(0L);
if (state.getLastAppliedIndex() - lastSnapshotIndex < minGapValue) {
return CompletableFuture.completedFuture(newSuccessReply(request, lastSnapshotIndex));
}
synchronized (this) {
long installSnapshot = inProgressInstallSnapshotRequest.get();
// check snapshot install/load
if (installSnapshot != 0) {
String msg = String.format("%s: Failed do snapshot as snapshot (%s) installation is in progress",
getMemberId(), installSnapshot);
LOG.warn(msg);
return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg)));
}
return snapshotRequestHandler.takingSnapshotAsync(request);
}
}
SnapshotManagementRequestHandler getSnapshotRequestHandler() {
return snapshotRequestHandler;
}
CompletableFuture<RaftClientReply> setLeaderElectionAsync(LeaderElectionRequest request) throws IOException {
LOG.info("{} receive pauseLeaderElection {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
final LeaderElectionRequest.Pause pause = request.getPause();
if (pause != null) {
getRole().setLeaderElectionPause(true);
return CompletableFuture.completedFuture(newSuccessReply(request));
}
final LeaderElectionRequest.Resume resume = request.getResume();
if (resume != null) {
getRole().setLeaderElectionPause(false);
return CompletableFuture.completedFuture(newSuccessReply(request));
}
return JavaUtils.completeExceptionally(new UnsupportedOperationException(
getId() + ": Request not supported " + request));
}
public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
return waitForReply(request, setConfigurationAsync(request));
}
/**
* Handle a raft configuration change request from client.
*/
public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException {
LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, true);
if (reply != null) {
return reply;
}
final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, null, false);
if (reply != null) {
return reply;
}
final RaftConfigurationImpl current = getRaftConf();
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
// make sure there is no other raft reconfiguration in progress
if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
throw new ReconfigurationInProgressException(
"Reconfiguration is already in progress: " + current);
}
// return success with a null message if the new conf is the same as the current
if (current.hasNoChange(peersInNewConf)) {
pending = new PendingRequest(request);
pending.setReply(newSuccessReply(request));
return pending.getFuture();
}
getRaftServer().addRaftPeers(peersInNewConf);
// add staging state into the leaderState
pending = leaderState.startSetConfiguration(request);
}
return pending.getFuture();
}
/**
* check if the remote peer is not included in the current conf
* and should shutdown. should shutdown if all the following stands:
* 1. this is a leader
* 2. current conf is stable and has been committed
* 3. candidate id is not included in conf
* 4. candidate's last entry's index < conf's index
*/
private boolean shouldSendShutdown(RaftPeerId candidateId,
TermIndex candidateLastEntry) {
return getInfo().isLeader()
&& getRaftConf().isStable()
&& getState().isConfCommitted()
&& !getRaftConf().containsInConf(candidateId)
&& candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
&& role.getLeaderState().map(ls -> !ls.isBootStrappingPeer(candidateId)).orElse(false);
}
@Override
public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException {
final RaftRpcRequestProto request = r.getServerRequest();
return requestVote(r.getPreVote() ? Phase.PRE_VOTE : Phase.ELECTION,
RaftPeerId.valueOf(request.getRequestorId()),
ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
r.getCandidateTerm(),
TermIndex.valueOf(r.getCandidateLastEntry()));
}
private RequestVoteReplyProto requestVote(Phase phase,
RaftPeerId candidateId, RaftGroupId candidateGroupId,
long candidateTerm, TermIndex candidateLastEntry) throws IOException {
CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
candidateId, candidateTerm, candidateLastEntry);
LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(candidateId, candidateGroupId);
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.RUNNING);
final VoteContext context = new VoteContext(this, phase, candidateId);
final RaftPeer candidate = context.recognizeCandidate(candidateTerm);
final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
if (candidate != null && phase == Phase.ELECTION) {
// change server state in the ELECTION phase
final boolean termUpdated = changeToFollower(candidateTerm, true, "candidate:" + candidateId);
if (voteGranted) {
state.grantVote(candidate.getId());
}
if (termUpdated || voteGranted) {
state.persistMetadata(); // sync metafile
}
}
if (voteGranted) {
role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE));
} else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
shouldShutdown = true;
}
reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
voteGranted, state.getCurrentTerm(), shouldShutdown);
if (LOG.isInfoEnabled()) {
LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
getMemberId(), phase, ServerStringUtils.toRequestVoteReplyString(reply), state);
}
}
return reply;
}
private void validateEntries(long expectedTerm, TermIndex previous,
LogEntryProto... entries) {
if (entries != null && entries.length > 0) {
final long index0 = entries[0].getIndex();
if (previous == null || previous.getTerm() == 0) {
Preconditions.assertTrue(index0 == 0,
"Unexpected Index: previous is null but entries[%s].getIndex()=%s",
0, index0);
} else {
Preconditions.assertTrue(previous.getIndex() == index0 - 1,
"Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
previous, 0, index0);
}
for (int i = 0; i < entries.length; i++) {
final long t = entries[i].getTerm();
Preconditions.assertTrue(expectedTerm >= t,
"Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
i, t, expectedTerm);
final long indexi = entries[i].getIndex();
Preconditions.assertTrue(indexi == index0 + i,
"Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
i, indexi, index0);
}
}
}
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
throws IOException {
try {
return appendEntriesAsync(r).join();
} catch (CompletionException e) {
throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e));
}
}
@Override
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto r)
throws IOException {
// TODO avoid converting list to array
final RaftRpcRequestProto request = r.getServerRequest();
final LogEntryProto[] entries = r.getEntriesList()
.toArray(new LogEntryProto[r.getEntriesCount()]);
final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null;
final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
previous, r.getLeaderCommit(), r.getInitializing(), entries);
try {
return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries);
} catch(Exception t) {
LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
throw t;
}
}
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
if (isHeartbeat) {
if (LOG.isTraceEnabled()) {
LOG.trace("HEARTBEAT: " + message.get());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(message.get());
}
}
}
private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) {
final Optional<FollowerState> fs = role.getFollowerState();
if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) {
fs.get().updateLastRpcTime(updateType);
return fs;
} else {
return Optional.empty();
}
}
private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException {
CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
if (!startComplete.get()) {
throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized.");
}
assertGroup(leaderId, leaderGroupId);
try {
validateEntries(leaderTerm, previous, entries);
} catch (IllegalArgumentException e) {
throw new IOException(e);
}
}
private CommitInfoProto updateCommitInfoCache() {
return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
}
@SuppressWarnings("checkstyle:parameternumber")
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
final boolean isHeartbeat = entries.length == 0;
logAppendEntries(isHeartbeat,
() -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
+ previous + ", " + leaderCommit + ", " + initializing
+ ", commits" + ProtoUtils.toString(commitInfos)
+ ", entries: " + LogProtoUtils.toLogEntriesString(entries));
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
Timer.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), NOT_LEADER, callId,
RaftLog.INVALID_LOG_INDEX, isHeartbeat);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
getMemberId(), leaderId, leaderTerm, state, ServerStringUtils.toAppendEntriesReplyString(reply));
}
return CompletableFuture.completedFuture(reply);
}
try {
changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries");
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
state.setLeader(leaderId, "appendEntries");
if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
role.startFollowerState(this, Op.APPEND_ENTRIES);
}
followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START);
// Check that the append entries are not inconsistent. There are 3
// scenarios which can result in inconsistency:
// 1. There is a snapshot installation in progress
// 2. There is an overlap between the snapshot index and the entries
// 3. There is a gap between the local log and the entries
// In any of these scenarios, we should return an INCONSISTENCY reply
// back to leader so that the leader can update this follower's next index.
AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries(
leaderId, currentTerm, followerCommit, previous, callId, isHeartbeat, entries);
if (inconsistencyReply != null) {
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
return CompletableFuture.completedFuture(inconsistencyReply);
}
state.updateConfiguration(entries);
}
final List<CompletableFuture<Long>> futures = entries.length == 0 ? Collections.emptyList()
: state.getLog().append(entries);
commitInfos.forEach(commitInfoCache::update);
if (!isHeartbeat) {
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
}
return JavaUtils.allOf(futures).whenCompleteAsync(
(r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.length);
state.updateCommitIndex(commitIndex, currentTerm, false);
updateCommitInfoCache();
final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
final long matchIndex = entries.length != 0 ? entries[entries.length - 1].getIndex() :
RaftLog.INVALID_LOG_INDEX;
reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm,
state.getLog().getLastCommittedIndex(), n, SUCCESS, callId, matchIndex,
isHeartbeat);
}
logAppendEntries(isHeartbeat, () -> getMemberId() + ": succeeded to handle AppendEntries. Reply: "
+ ServerStringUtils.toAppendEntriesReplyString(reply));
timer.stop(); // TODO: future never completes exceptionally?
return reply;
});
}
private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm,
long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, LogEntryProto... entries) {
final long replyNextIndex = checkInconsistentAppendEntries(previous, entries);
if (replyNextIndex == -1) {
return null;
}
final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, INCONSISTENCY, callId,
RaftLog.INVALID_LOG_INDEX, isHeartbeat);
LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), ServerStringUtils.toAppendEntriesReplyString(reply));
return reply;
}
private long checkInconsistentAppendEntries(TermIndex previous, LogEntryProto... entries) {
// Check if a snapshot installation through state machine is in progress.
final long installSnapshot = inProgressInstallSnapshotRequest.get();
if (installSnapshot != 0) {
LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in progress", getMemberId(), installSnapshot);
return state.getNextIndex();
}
// Check that the first log entry is greater than the snapshot index in the latest snapshot and follower's last
// committed index. If not, reply to the leader the new next index.
if (entries != null && entries.length > 0) {
final long firstEntryIndex = entries[0].getIndex();
final long snapshotIndex = state.getSnapshotIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
final long nextIndex = Math.max(snapshotIndex, commitIndex);
if (nextIndex > 0 && nextIndex >= firstEntryIndex) {
LOG.info("{}: Failed appendEntries as the first entry (index {})" +
" already exists (snapshotIndex: {}, commitIndex: {})",
getMemberId(), firstEntryIndex, snapshotIndex, commitIndex);
return nextIndex + 1;
}
}
// Check if "previous" is contained in current state.
if (previous != null && !state.containsTermIndex(previous)) {
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
return replyNextIndex;
}
return -1;
}
@Override
public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("{}: receive installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotRequestString(request));
}
final InstallSnapshotReplyProto reply;
try {
reply = installSnapshotImpl(request);
} catch (Exception e) {
LOG.error("{}: installSnapshot failed", getMemberId(), e);
throw e;
}
if (LOG.isInfoEnabled()) {
LOG.info("{}: reply installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotReplyString(reply));
}
return reply;
}
void setLeaderElectionPause(boolean pause) throws ServerNotReadyException {
assertLifeCycleState(LifeCycle.States.RUNNING);
role.setLeaderElectionPause(pause);
}
boolean pause() {
// TODO: should pause() be limited on only working for a follower?
// Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
// Pause() should pause ongoing operations:
// a. call {@link StateMachine#pause()}.
synchronized (this) {
if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
return false;
}
// TODO: any other operations that needs to be paused?
stateMachine.pause();
lifeCycle.compareAndTransition(PAUSING, PAUSED);
}
return true;
}
boolean resume() throws IOException {
synchronized (this) {
if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
return false;
}
// TODO: any other operations that needs to be resumed?
try {
stateMachine.reinitialize();
} catch (IOException e) {
LOG.warn("Failed to reinitialize statemachine: {}", stateMachine);
lifeCycle.compareAndTransition(STARTING, EXCEPTION);
throw e;
}
lifeCycle.compareAndTransition(STARTING, RUNNING);
}
return true;
}
@Override
public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);
LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},",
getMemberId(), leaderId, request.getLeaderLastEntry());
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(leaderId, leaderGroupId);
synchronized (this) {
// leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry
// so leader at each term should has at least one entry
if (leaderLastEntry == null) {
LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId());
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm());
if (!recognized) {
LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}",
getMemberId(), leaderId, leaderLastEntry.getTerm(), state);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
if (!getInfo().isFollower()) {
LOG.warn("{} refused StartLeaderElectionRequest from {}, because role is:{}",
getMemberId(), leaderId, role.getCurrentRole());
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) {
LOG.warn("{} refused StartLeaderElectionRequest from {}, because lastEntry:{} less than leaderEntry:{}",
getMemberId(), leaderId, leaderLastEntry, state.getLastEntry());
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
changeToCandidate(true);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
}
}
private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(),
leaderId, request);
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
assertGroup(leaderId, leaderGroupId);
InstallSnapshotReplyProto reply = null;
// Check if install snapshot from Leader is enabled
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
if (request.hasSnapshotChunk()) {
reply = checkAndInstallSnapshot(request, leaderId);
}
} else {
// Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
if (request.hasNotification()) {
reply = notifyStateMachineToInstallSnapshot(request, leaderId);
}
}
if (reply != null) {
if (request.hasLastRaftConfigurationLogEntryProto()) {
// Set the configuration included in the snapshot
LogEntryProto newConfLogEntryProto =
request.getLastRaftConfigurationLogEntryProto();
LOG.info("{}: set new configuration {} from snapshot", getMemberId(),
newConfLogEntryProto);
state.setRaftConf(newConfLogEntryProto);
state.writeRaftConfiguration(newConfLogEntryProto);
stateMachine.event().notifyConfigurationChanged(newConfLogEntryProto.getTerm(), newConfLogEntryProto.getIndex(),
newConfLogEntryProto.getConfigurationEntry());
}
return reply;
}
// There is a mismatch between configurations on leader and follower.
final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto(
leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled);
return failedReply;
}
private InstallSnapshotReplyProto checkAndInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId());
return reply;
}
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
try {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
// have a lot of requests
Preconditions.assertTrue(
state.getLog().getNextIndex() <= lastIncludedIndex,
"%s log's next id is %s, last included index in snapshot is %s",
getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex);
//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncludedIndex);
}
} finally {
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
}
}
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
}
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
}
private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
return reply;
}
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
long snapshotIndex = state.getSnapshotIndex();
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
if (inProgressInstallSnapshotRequest.compareAndSet(0, firstAvailableLogIndex)) {
LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}
Optional<RaftPeerProto> leaderPeerInfo = null;
if (request.hasLastRaftConfigurationLogEntryProto()) {
List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
.getPeersList();
leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
Preconditions.assertTrue(leaderPeerInfo.isPresent());
}
// For the cases where RaftConf is empty on newly started peer with
// empty peer list, we retrieve leader info from
// installSnapShotRequestProto.
RoleInfoProto roleInfoProto =
getRaftConf().getPeer(state.getLeaderId()) == null ?
getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
getRoleInfoProto();
// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
try {
stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
getMemberId(), exception.getMessage());
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
return;
}
if (reply != null) {
LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.",
getMemberId(), reply.getIndex());
stateMachine.pause();
state.updateInstalledSnapshotIndex(reply);
state.reloadStateMachine(reply.getIndex());
installedSnapshotIndex.set(reply.getIndex());
} else {
isSnapshotNull.set(true);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine could not install snapshot as it is not available", this);
}
}
// wait for 1 seconds for statemachine to install snapshot
}).get(1, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException t) {
//nothing to do
} catch (Exception t) {
// there are two cases:
//1 `get()` may throw ExecutionException if `whenComplete` throw an exception
//2 when generating completeFuture, `statemachine#notifyInstallSnapshotFromLeader`
// may throw an uncertain exception, which is determined by the implementation of
// user statemachine.
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
final String err = getMemberId() + ": Failed to notify StateMachine to InstallSnapshot.";
LOG.warn(err + " " + t);
throw new IOException(err, t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId());
}
}
// If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
// installedSnapshotIndex to 0.
long latestInstalledSnapshotIndex = this.installedSnapshotIndex.getAndSet(0);
if (latestInstalledSnapshotIndex > 0) {
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotIndex);
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotIndex);
}
// Otherwise, Snapshot installation is in progress.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
}
}
void submitUpdateCommitEvent() {
role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
}
/**
* The log has been submitted to the state machine. Use the future to update
* the pending requests and retry cache.
* @param logEntry the log entry that has been submitted to the state machine
* @param stateMachineFuture the future returned by the state machine
* from which we will get transaction result later
*/
private CompletableFuture<Message> replyPendingRequest(
LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) {
Preconditions.assertTrue(logEntry.hasStateMachineLogEntry());
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
// update the retry cache
final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
if (getInfo().isLeader()) {
Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(),
"retry cache entry should be pending: %s", cacheEntry);
}
if (cacheEntry.isFailed()) {
retryCache.refreshEntry(new CacheEntry(cacheEntry.getKey()));
}
final long logIndex = logEntry.getIndex();
return stateMachineFuture.whenComplete((reply, exception) -> {
final RaftClientReply.Builder b = newReplyBuilder(invocationId, logIndex);
final RaftClientReply r;
if (exception == null) {
r = b.setSuccess().setMessage(reply).build();
} else {
// the exception is coming from the state machine. wrap it into the
// reply as a StateMachineException
final StateMachineException e = new StateMachineException(getMemberId(), exception);
r = b.setException(e).build();
}
// update pending request
role.getLeaderState().ifPresent(leader -> leader.replyPendingRequest(logIndex, r));
cacheEntry.updateResult(r);
});
}
CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws RaftLogIOException {
if (!next.hasStateMachineLogEntry()) {
stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex());
}
if (next.hasConfigurationEntry()) {
// the reply should have already been set. only need to record
// the new conf in the metadata file and notify the StateMachine.
state.writeRaftConfiguration(next);
stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry());
} else if (next.hasStateMachineLogEntry()) {
// check whether there is a TransactionContext because we are the leader.
TransactionContext trx = role.getLeaderState()
.map(leader -> leader.getTransactionContext(next.getIndex())).orElseGet(
() -> TransactionContext.newBuilder()
.setServerRole(role.getCurrentRole())
.setStateMachine(stateMachine)
.setLogEntry(next)
.build());
try {
// Let the StateMachine inject logic for committed transactions in sequential order.
trx = stateMachine.applyTransactionSerial(trx);
final CompletableFuture<Message> stateMachineFuture = stateMachine.applyTransaction(trx);
return replyPendingRequest(next, stateMachineFuture);
} catch (Exception e) {
throw new RaftLogIOException(e);
}
}
return null;
}
/**
* The given log entry is being truncated.
* Fail the corresponding client request, if there is any.
*
* @param logEntry the log entry being truncated
*/
void notifyTruncatedLogEntry(LogEntryProto logEntry) {
if (logEntry.hasStateMachineLogEntry()) {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
final CacheEntry cacheEntry = getRetryCache().getIfPresent(invocationId);
if (cacheEntry != null) {
cacheEntry.failWithReply(newReplyBuilder(invocationId, logEntry.getIndex())
.setException(generateNotLeaderException())
.build());
}
}
}
LeaderElectionMetrics getLeaderElectionMetrics() {
return leaderElectionMetrics;
}
@Override
public RaftServerMetricsImpl getRaftServerMetrics() {
return raftServerMetrics;
}
private class RaftServerJmxAdapter extends JmxRegister implements RaftServerMXBean {
@Override
public String getId() {
return getMemberId().getPeerId().toString();
}
@Override
public String getLeaderId() {
RaftPeerId leaderId = getState().getLeaderId();
if (leaderId != null) {
return leaderId.toString();
} else {
return null;
}
}
@Override
public long getCurrentTerm() {
return getState().getCurrentTerm();
}
@Override
public String getGroupId() {
return getMemberId().getGroupId().toString();
}
@Override
public String getRole() {
return role.toString();
}
@Override
public List<String> getFollowers() {
return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElse(Collections.emptyList())
.stream().map(RaftPeer::toString).collect(Collectors.toList());
}
@Override
public List<String> getGroups() {
return proxy.getGroupIds().stream().map(RaftGroupId::toString)
.collect(Collectors.toList());
}
}
}