| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.server.impl; |
| |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.ratis.client.RaftClient; |
| import org.apache.ratis.client.impl.ClientProtoUtils; |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.proto.RaftProtos.*; |
| import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; |
| import org.apache.ratis.protocol.*; |
| import org.apache.ratis.protocol.exceptions.GroupMismatchException; |
| import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; |
| import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; |
| import org.apache.ratis.protocol.exceptions.NotLeaderException; |
| import org.apache.ratis.protocol.exceptions.RaftException; |
| import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException; |
| import org.apache.ratis.protocol.exceptions.ResourceUnavailableException; |
| import org.apache.ratis.protocol.exceptions.ServerNotReadyException; |
| import org.apache.ratis.protocol.exceptions.StaleReadException; |
| import org.apache.ratis.protocol.exceptions.StateMachineException; |
| import org.apache.ratis.protocol.exceptions.TransferLeadershipException; |
| import org.apache.ratis.server.DataStreamMap; |
| import org.apache.ratis.server.DivisionInfo; |
| import org.apache.ratis.server.DivisionProperties; |
| import org.apache.ratis.server.leader.FollowerInfo; |
| import org.apache.ratis.server.RaftServer; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.RaftServerMXBean; |
| import org.apache.ratis.server.RaftServerRpc; |
| import org.apache.ratis.server.impl.LeaderElection.Phase; |
| import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry; |
| import org.apache.ratis.server.leader.LeaderState; |
| import org.apache.ratis.server.leader.LogAppender; |
| import org.apache.ratis.server.metrics.LeaderElectionMetrics; |
| import org.apache.ratis.server.metrics.RaftServerMetricsImpl; |
| import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; |
| import org.apache.ratis.server.protocol.RaftServerProtocol; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.server.raftlog.LogProtoUtils; |
| import org.apache.ratis.server.raftlog.RaftLog; |
| import org.apache.ratis.server.raftlog.RaftLogIOException; |
| import org.apache.ratis.server.storage.RaftStorage; |
| import org.apache.ratis.server.storage.RaftStorageDirectory; |
| import org.apache.ratis.server.util.ServerStringUtils; |
| import org.apache.ratis.statemachine.SnapshotInfo; |
| import org.apache.ratis.statemachine.StateMachine; |
| import org.apache.ratis.statemachine.TransactionContext; |
| import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; |
| import org.apache.ratis.util.*; |
| |
| import javax.management.ObjectName; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.NoSuchFileException; |
| import java.util.*; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; |
| import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; |
| import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; |
| import static org.apache.ratis.util.LifeCycle.State.EXCEPTION; |
| import static org.apache.ratis.util.LifeCycle.State.NEW; |
| import static org.apache.ratis.util.LifeCycle.State.PAUSED; |
| import static org.apache.ratis.util.LifeCycle.State.PAUSING; |
| import static org.apache.ratis.util.LifeCycle.State.RUNNING; |
| import static org.apache.ratis.util.LifeCycle.State.STARTING; |
| |
| import com.codahale.metrics.Timer; |
| import org.apache.ratis.util.function.CheckedSupplier; |
| |
| class RaftServerImpl implements RaftServer.Division, |
| RaftServerProtocol, RaftServerAsynchronousProtocol, |
| RaftClientProtocol, RaftClientAsynchronousProtocol{ |
| private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class); |
| static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; |
| static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; |
| static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; |
| static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete"; |
| static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection"; |
| |
| class Info implements DivisionInfo { |
| @Override |
| public RaftPeerRole getCurrentRole() { |
| return getRole().getCurrentRole(); |
| } |
| |
| @Override |
| public boolean isLeaderReady() { |
| return isLeader() && getRole().isLeaderReady(); |
| } |
| |
| @Override |
| public LifeCycle.State getLifeCycleState() { |
| return lifeCycle.getCurrentState(); |
| } |
| |
| @Override |
| public RoleInfoProto getRoleInfoProto() { |
| return RaftServerImpl.this.getRoleInfoProto(); |
| } |
| |
| @Override |
| public long getCurrentTerm() { |
| return getState().getCurrentTerm(); |
| } |
| |
| @Override |
| public long getLastAppliedIndex() { |
| return getState().getLastAppliedIndex(); |
| } |
| |
| @Override |
| public long[] getFollowerNextIndices() { |
| return role.getLeaderState() |
| .filter(leader -> isLeader()) |
| .map(LeaderStateImpl::getFollowerNextIndices) |
| .orElse(null); |
| } |
| } |
| |
| private final RaftServerProxy proxy; |
| private final StateMachine stateMachine; |
| private final Info info = new Info(); |
| |
| private final DivisionProperties divisionProperties; |
| private final int maxTimeoutMs; |
| private final TimeDuration leaderStepDownWaitTime; |
| private final TimeDuration sleepDeviationThreshold; |
| private final boolean installSnapshotEnabled; |
| |
| private final LifeCycle lifeCycle; |
| private final ServerState state; |
| private final RoleInfo role; |
| |
| private final DataStreamMap dataStreamMap; |
| |
| private final MemoizedSupplier<RaftClient> raftClient; |
| |
| private final RetryCacheImpl retryCache; |
| private final CommitInfoCache commitInfoCache = new CommitInfoCache(); |
| |
| private final RaftServerJmxAdapter jmxAdapter; |
| private final LeaderElectionMetrics leaderElectionMetrics; |
| private final RaftServerMetricsImpl raftServerMetrics; |
| |
| private final AtomicLong inProgressInstallSnapshotRequest; |
| private final AtomicLong installedSnapshotIndex; |
| private final AtomicBoolean isSnapshotNull; |
| |
| // To avoid append entry before complete start() method |
| // For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry |
| // request, and change state to RUNNING by lifeCycle.compareAndTransition(STARTING, RUNNING), |
| // then thread1 execute lifeCycle.transition(RUNNING) in startAsFollower(), |
| // So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING, |
| private final AtomicBoolean startComplete; |
| |
| private final TransferLeadership transferLeadership; |
| private final SnapshotManagementRequestHandler snapshotRequestHandler; |
| |
| private final ExecutorService serverExecutor; |
| private final ExecutorService clientExecutor; |
| |
| RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException { |
| final RaftPeerId id = proxy.getId(); |
| LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine); |
| this.lifeCycle = new LifeCycle(id); |
| this.stateMachine = stateMachine; |
| this.role = new RoleInfo(id); |
| |
| final RaftProperties properties = proxy.getProperties(); |
| this.divisionProperties = new DivisionPropertiesImpl(properties); |
| maxTimeoutMs = properties().maxRpcTimeoutMs(); |
| leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties); |
| this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties); |
| installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); |
| this.proxy = proxy; |
| |
| this.state = new ServerState(id, group, properties, this, stateMachine); |
| this.retryCache = new RetryCacheImpl(properties); |
| this.inProgressInstallSnapshotRequest = new AtomicLong(); |
| this.installedSnapshotIndex = new AtomicLong(); |
| this.isSnapshotNull = new AtomicBoolean(false); |
| this.dataStreamMap = new DataStreamMapImpl(id); |
| |
| this.jmxAdapter = new RaftServerJmxAdapter(); |
| this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics( |
| getMemberId(), state::getLastLeaderElapsedTimeMs); |
| this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics( |
| getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics); |
| |
| this.startComplete = new AtomicBoolean(false); |
| |
| this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder() |
| .setRaftGroup(group) |
| .setProperties(getRaftServer().getProperties()) |
| .build()); |
| |
| this.transferLeadership = new TransferLeadership(this); |
| this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); |
| |
| this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax( |
| RaftServerConfigKeys.ThreadPool.serverCached(properties), |
| RaftServerConfigKeys.ThreadPool.serverSize(properties), |
| id + "-server"); |
| this.clientExecutor = ConcurrentUtils.newThreadPoolWithMax( |
| RaftServerConfigKeys.ThreadPool.clientCached(properties), |
| RaftServerConfigKeys.ThreadPool.clientSize(properties), |
| id + "-client"); |
| } |
| |
| @Override |
| public DivisionProperties properties() { |
| return divisionProperties; |
| } |
| |
| LogAppender newLogAppender(LeaderState leaderState, FollowerInfo f) { |
| return getRaftServer().getFactory().newLogAppender(this, leaderState, f); |
| } |
| |
| int getMaxTimeoutMs() { |
| return maxTimeoutMs; |
| } |
| |
| TimeDuration getRandomElectionTimeout() { |
| final int min = properties().minRpcTimeoutMs(); |
| final int millis = min + ThreadLocalRandom.current().nextInt(properties().maxRpcTimeoutMs() - min + 1); |
| return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS); |
| } |
| |
| TimeDuration getLeaderStepDownWaitTime() { |
| return leaderStepDownWaitTime; |
| } |
| |
| TimeDuration getSleepDeviationThreshold() { |
| return sleepDeviationThreshold; |
| } |
| |
| @Override |
| public StateMachine getStateMachine() { |
| return stateMachine; |
| } |
| |
| @Override |
| public RaftLog getRaftLog() { |
| return getState().getLog(); |
| } |
| |
| @Override |
| public RaftStorage getRaftStorage() { |
| return getState().getStorage(); |
| } |
| |
| @Override |
| public DataStreamMap getDataStreamMap() { |
| return dataStreamMap; |
| } |
| |
| @Override |
| public RaftClient getRaftClient() { |
| return raftClient.get(); |
| } |
| |
| @Override |
| public RetryCacheImpl getRetryCache() { |
| return retryCache; |
| } |
| |
| @Override |
| public RaftServerProxy getRaftServer() { |
| return proxy; |
| } |
| |
| RaftServerRpc getServerRpc() { |
| return proxy.getServerRpc(); |
| } |
| |
| private void setRole(RaftPeerRole newRole, Object reason) { |
| LOG.info("{}: changes role from {} to {} at term {} for {}", |
| getMemberId(), this.role, newRole, state.getCurrentTerm(), reason); |
| this.role.transitionRole(newRole); |
| } |
| |
| boolean start() { |
| if (!lifeCycle.compareAndTransition(NEW, STARTING)) { |
| return false; |
| } |
| final RaftConfigurationImpl conf = getRaftConf(); |
| if (conf != null && conf.containsInBothConfs(getId())) { |
| LOG.info("{}: start as a follower, conf={}", getMemberId(), conf); |
| startAsFollower(); |
| } else { |
| LOG.info("{}: start with initializing state, conf={}", getMemberId(), conf); |
| startInitializing(); |
| } |
| |
| registerMBean(getId(), getMemberId().getGroupId(), jmxAdapter, jmxAdapter); |
| state.start(); |
| startComplete.compareAndSet(false, true); |
| return true; |
| } |
| |
| static boolean registerMBean( |
| RaftPeerId id, RaftGroupId groupdId, RaftServerMXBean mBean, JmxRegister jmx) { |
| final String prefix = "Ratis:service=RaftServer,group=" + groupdId + ",id="; |
| final String registered = jmx.register(mBean, Arrays.asList( |
| () -> prefix + id, |
| () -> prefix + ObjectName.quote(id.toString()))); |
| return registered != null; |
| } |
| |
| /** |
| * The peer belongs to the current configuration, should start as a follower |
| */ |
| private void startAsFollower() { |
| setRole(RaftPeerRole.FOLLOWER, "startAsFollower"); |
| role.startFollowerState(this, "startAsFollower"); |
| lifeCycle.transition(RUNNING); |
| } |
| |
| /** |
| * The peer does not have any configuration (maybe it will later be included |
| * in some configuration). Start still as a follower but will not vote or |
| * start election. |
| */ |
| private void startInitializing() { |
| setRole(RaftPeerRole.FOLLOWER, "startInitializing"); |
| // do not start FollowerState |
| } |
| |
| ServerState getState() { |
| return state; |
| } |
| |
| @Override |
| public RaftGroupMemberId getMemberId() { |
| return getState().getMemberId(); |
| } |
| |
| @Override |
| public DivisionInfo getInfo() { |
| return info; |
| } |
| |
| RoleInfo getRole() { |
| return role; |
| } |
| |
| @Override |
| public RaftConfigurationImpl getRaftConf() { |
| return getState().getRaftConf(); |
| } |
| |
| /** |
| * This removes the group from the server. |
| * If the deleteDirectory flag is set to false, and renameDirectory |
| * the directory is moved to |
| * {@link RaftServerConfigKeys#REMOVED_GROUPS_DIR_KEY} location. |
| * If the deleteDirectory flag is true, the group is permanently deleted. |
| */ |
| void groupRemove(boolean deleteDirectory, boolean renameDirectory) { |
| final RaftStorageDirectory dir = state.getStorage().getStorageDir(); |
| |
| /* Shutdown is triggered here inorder to avoid any locked files. */ |
| close(); |
| getStateMachine().event().notifyGroupRemove(); |
| if (deleteDirectory) { |
| for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) { |
| try { |
| FileUtils.deleteFully(dir.getRoot()); |
| LOG.info("{}: Succeed to remove RaftStorageDirectory {}", getMemberId(), dir); |
| break; |
| } catch (NoSuchFileException e) { |
| LOG.warn("{}: Some file does not exist {}", getMemberId(), dir, e); |
| } catch (Exception ignored) { |
| LOG.error("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, ignored); |
| break; |
| } |
| } |
| } else if(renameDirectory) { |
| try { |
| /* Create path with current group in REMOVED_GROUPS_DIR_KEY location */ |
| File toBeRemovedGroupFolder = new File(RaftServerConfigKeys |
| .removedGroupsDir(proxy.getProperties()), |
| dir.getRoot().getName()); |
| |
| FileUtils.moveDirectory(dir.getRoot().toPath(), |
| toBeRemovedGroupFolder.toPath()); |
| |
| LOG.info("{}: Group {} is renamed successfully", getMemberId(), getGroup()); |
| } catch (IOException e) { |
| LOG.warn("{}: Failed to remove group {}", getMemberId(), |
| dir.getRoot().getName(), e); |
| } |
| } |
| } |
| |
| @Override |
| public void close() { |
| lifeCycle.checkStateAndClose(() -> { |
| LOG.info("{}: shutdown", getMemberId()); |
| try { |
| jmxAdapter.unregister(); |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), ignored); |
| } |
| try { |
| role.shutdownFollowerState(); |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), ignored); |
| } |
| try{ |
| role.shutdownLeaderElection(); |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), ignored); |
| } |
| try{ |
| role.shutdownLeaderState(true); |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), ignored); |
| } |
| try{ |
| state.close(); |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to close state", getMemberId(), ignored); |
| } |
| try { |
| leaderElectionMetrics.unregister(); |
| raftServerMetrics.unregister(); |
| RaftServerMetricsImpl.removeRaftServerMetrics(getMemberId()); |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored); |
| } |
| try { |
| if (raftClient.isInitialized()) { |
| raftClient.get().close(); |
| } |
| } catch (Exception ignored) { |
| LOG.warn("{}: Failed to close raft client", getMemberId(), ignored); |
| } |
| |
| try { |
| ConcurrentUtils.shutdownAndWait(clientExecutor); |
| } catch (Exception ignored) { |
| LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", ignored); |
| } |
| try { |
| ConcurrentUtils.shutdownAndWait(serverExecutor); |
| } catch (Exception ignored) { |
| LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", ignored); |
| } |
| }); |
| } |
| |
| /** |
| * Change the server state to Follower if this server is in a different role or force is true. |
| * @param newTerm The new term. |
| * @param force Force to start a new {@link FollowerState} even if this server is already a follower. |
| * @return if the term/votedFor should be updated to the new term |
| */ |
| private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) { |
| final RaftPeerRole old = role.getCurrentRole(); |
| final boolean metadataUpdated = state.updateCurrentTerm(newTerm); |
| |
| if (old != RaftPeerRole.FOLLOWER || force) { |
| setRole(RaftPeerRole.FOLLOWER, reason); |
| if (old == RaftPeerRole.LEADER) { |
| role.shutdownLeaderState(false); |
| } else if (old == RaftPeerRole.CANDIDATE) { |
| role.shutdownLeaderElection(); |
| } else if (old == RaftPeerRole.FOLLOWER) { |
| role.shutdownFollowerState(); |
| } |
| role.startFollowerState(this, reason); |
| } |
| return metadataUpdated; |
| } |
| |
| synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException { |
| if (changeToFollower(newTerm, false, reason)) { |
| state.persistMetadata(); |
| } |
| } |
| |
| synchronized void changeToLeader() { |
| Preconditions.assertTrue(getInfo().isCandidate()); |
| role.shutdownLeaderElection(); |
| setRole(RaftPeerRole.LEADER, "changeToLeader"); |
| state.becomeLeader(); |
| |
| // start sending AppendEntries RPC to followers |
| final LogEntryProto e = role.startLeaderState(this); |
| getState().setRaftConf(e); |
| } |
| |
| @Override |
| public Collection<CommitInfoProto> getCommitInfos() { |
| final List<CommitInfoProto> infos = new ArrayList<>(); |
| // add the commit info of this server |
| infos.add(updateCommitInfoCache()); |
| |
| // add the commit infos of other servers |
| if (getInfo().isLeader()) { |
| role.getLeaderState().ifPresent( |
| leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos)); |
| } else { |
| getRaftConf().getAllPeers().stream() |
| .map(RaftPeer::getId) |
| .filter(id -> !id.equals(getId())) |
| .map(commitInfoCache::get) |
| .filter(Objects::nonNull) |
| .forEach(infos::add); |
| } |
| return infos; |
| } |
| |
| GroupInfoReply getGroupInfo(GroupInfoRequest request) { |
| return new GroupInfoReply(request, getCommitInfos(), |
| getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy()); |
| } |
| |
| private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) { |
| RaftPeerRole currentRole = role.getCurrentRole(); |
| RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder() |
| .setSelf(getPeer().getRaftPeerProto()) |
| .setRole(currentRole) |
| .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()); |
| final Optional<FollowerState> fs = role.getFollowerState(); |
| final ServerRpcProto leaderInfo = |
| ServerProtoUtils.toServerRpcProto(leaderPeerInfo, |
| fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); |
| roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo) |
| .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0))); |
| return roleInfo.build(); |
| } |
| |
| RoleInfoProto getRoleInfoProto() { |
| RaftPeerRole currentRole = role.getCurrentRole(); |
| RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder() |
| .setSelf(getPeer().getRaftPeerProto()) |
| .setRole(currentRole) |
| .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()); |
| switch (currentRole) { |
| case CANDIDATE: |
| CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder() |
| .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs()); |
| roleInfo.setCandidateInfo(candidate); |
| break; |
| |
| case FOLLOWER: |
| final Optional<FollowerState> fs = role.getFollowerState(); |
| final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto( |
| getRaftConf().getPeer(state.getLeaderId()), |
| fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); |
| // FollowerState can be null while adding a new peer as it is not |
| // a voting member yet |
| roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder() |
| .setLeaderInfo(leaderInfo) |
| .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0))); |
| break; |
| |
| case LEADER: |
| role.getLeaderState().ifPresent(ls -> { |
| final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder(); |
| ls.getLogAppenders().map(LogAppender::getFollower).forEach(f -> |
| leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto( |
| f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs()))); |
| leader.setTerm(ls.getCurrentTerm()); |
| roleInfo.setLeaderInfo(leader); |
| }); |
| break; |
| |
| default: |
| throw new IllegalStateException("incorrect role of server " + currentRole); |
| } |
| return roleInfo.build(); |
| } |
| |
| synchronized void changeToCandidate(boolean forceStartLeaderElection) { |
| Preconditions.assertTrue(getInfo().isFollower()); |
| role.shutdownFollowerState(); |
| setRole(RaftPeerRole.CANDIDATE, "changeToCandidate"); |
| if (state.shouldNotifyExtendedNoLeader()) { |
| stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto()); |
| } |
| // start election |
| role.startLeaderElection(this, forceStartLeaderElection); |
| } |
| |
| @Override |
| public String toString() { |
| return role + " " + state + " " + lifeCycle.getCurrentState(); |
| } |
| |
| RaftClientReply.Builder newReplyBuilder(RaftClientRequest request) { |
| return RaftClientReply.newBuilder() |
| .setRequest(request) |
| .setCommitInfos(getCommitInfos()); |
| } |
| |
| private RaftClientReply.Builder newReplyBuilder(ClientInvocationId invocationId, long logIndex) { |
| return RaftClientReply.newBuilder() |
| .setClientInvocationId(invocationId) |
| .setLogIndex(logIndex) |
| .setServerId(getMemberId()) |
| .setCommitInfos(getCommitInfos()); |
| } |
| |
| RaftClientReply newSuccessReply(RaftClientRequest request) { |
| return newReplyBuilder(request) |
| .setSuccess() |
| .build(); |
| } |
| |
| RaftClientReply newSuccessReply(RaftClientRequest request, long logIndex) { |
| return newReplyBuilder(request) |
| .setSuccess() |
| .setLogIndex(logIndex) |
| .build(); |
| } |
| |
| RaftClientReply newExceptionReply(RaftClientRequest request, RaftException exception) { |
| return newReplyBuilder(request) |
| .setException(exception) |
| .build(); |
| } |
| |
| /** |
| * @return null if the server is in leader state. |
| */ |
| private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry, |
| boolean isWrite) { |
| try { |
| assertGroup(request.getRequestorId(), request.getRaftGroupId()); |
| } catch (GroupMismatchException e) { |
| return RetryCacheImpl.failWithException(e, entry); |
| } |
| |
| if (!getInfo().isLeader()) { |
| NotLeaderException exception = generateNotLeaderException(); |
| final RaftClientReply reply = newExceptionReply(request, exception); |
| return RetryCacheImpl.failWithReply(reply, entry); |
| } |
| if (!getInfo().isLeaderReady()) { |
| final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request)); |
| if (cacheEntry != null && cacheEntry.isCompletedNormally()) { |
| return cacheEntry.getReplyFuture(); |
| } |
| final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId()); |
| final RaftClientReply reply = newExceptionReply(request, lnre); |
| return RetryCacheImpl.failWithReply(reply, entry); |
| } |
| |
| if (isWrite && isSteppingDown()) { |
| final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down"); |
| final RaftClientReply reply = newExceptionReply(request, lsde); |
| return RetryCacheImpl.failWithReply(reply, entry); |
| } |
| |
| return null; |
| } |
| |
| NotLeaderException generateNotLeaderException() { |
| if (lifeCycle.getCurrentState() != RUNNING) { |
| return new NotLeaderException(getMemberId(), null, null); |
| } |
| RaftPeerId leaderId = state.getLeaderId(); |
| if (leaderId == null || leaderId.equals(getId())) { |
| // No idea about who is the current leader. Or the peer is the current |
| // leader, but it is about to step down. set the suggested leader as null. |
| leaderId = null; |
| } |
| final RaftConfigurationImpl conf = getRaftConf(); |
| Collection<RaftPeer> peers = conf.getAllPeers(); |
| return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), peers); |
| } |
| |
| private LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyException { |
| return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException( |
| getMemberId() + " is not in " + expected + ": current state is " + c), |
| expected); |
| } |
| |
| void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException { |
| final RaftGroupId groupId = getMemberId().getGroupId(); |
| if (!groupId.equals(requestorGroupId)) { |
| throw new GroupMismatchException(getMemberId() |
| + ": The group (" + requestorGroupId + ") of " + requestorId |
| + " does not match the group (" + groupId + ") of the server " + getId()); |
| } |
| } |
| |
| /** |
| * Handle a normal update request from client. |
| */ |
| private CompletableFuture<RaftClientReply> appendTransaction( |
| RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException { |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| CompletableFuture<RaftClientReply> reply; |
| |
| final PendingRequest pending; |
| synchronized (this) { |
| reply = checkLeaderState(request, cacheEntry, true); |
| if (reply != null) { |
| return reply; |
| } |
| |
| // append the message to its local log |
| final LeaderStateImpl leaderState = role.getLeaderStateNonNull(); |
| final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage()); |
| if (permit == null) { |
| cacheEntry.failWithException(new ResourceUnavailableException( |
| getMemberId() + ": Failed to acquire a pending write request for " + request)); |
| return cacheEntry.getReplyFuture(); |
| } |
| try { |
| state.appendLog(context); |
| } catch (StateMachineException e) { |
| // the StateMachineException is thrown by the SM in the preAppend stage. |
| // Return the exception in a RaftClientReply. |
| RaftClientReply exceptionReply = newExceptionReply(request, e); |
| cacheEntry.failWithReply(exceptionReply); |
| // leader will step down here |
| if (e.leaderShouldStepDown() && getInfo().isLeader()) { |
| leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION); |
| } |
| return CompletableFuture.completedFuture(exceptionReply); |
| } |
| |
| // put the request into the pending queue |
| pending = leaderState.addPendingRequest(permit, request, context); |
| if (pending == null) { |
| cacheEntry.failWithException(new ResourceUnavailableException( |
| getMemberId() + ": Failed to add a pending write request for " + request)); |
| return cacheEntry.getReplyFuture(); |
| } |
| leaderState.notifySenders(); |
| } |
| return pending.getFuture(); |
| } |
| |
| void stepDownOnJvmPause() { |
| role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE)); |
| } |
| |
| private RaftClientRequest filterDataStreamRaftClientRequest(RaftClientRequest request) |
| throws InvalidProtocolBufferException { |
| return !request.is(TypeCase.FORWARD) ? request : ClientProtoUtils.toRaftClientRequest( |
| RaftClientRequestProto.parseFrom( |
| request.getMessage().getContent().asReadOnlyByteBuffer())); |
| } |
| |
| <REPLY> CompletableFuture<REPLY> executeSubmitServerRequestAsync( |
| CheckedSupplier<CompletableFuture<REPLY>, IOException> submitFunction) { |
| return CompletableFuture.supplyAsync( |
| () -> JavaUtils.callAsUnchecked(submitFunction, CompletionException::new), |
| serverExecutor).join(); |
| } |
| |
| CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientRequest request) { |
| return CompletableFuture.supplyAsync( |
| () -> JavaUtils.callAsUnchecked(() -> submitClientRequestAsync(request), CompletionException::new), |
| clientExecutor).join(); |
| } |
| |
| @Override |
| public CompletableFuture<RaftClientReply> submitClientRequestAsync( |
| RaftClientRequest request) throws IOException { |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| LOG.debug("{}: receive client request({})", getMemberId(), request); |
| final Optional<Timer> timer = Optional.ofNullable(raftServerMetrics.getClientRequestTimer(request.getType())); |
| |
| final CompletableFuture<RaftClientReply> replyFuture; |
| |
| if (request.is(TypeCase.STALEREAD)) { |
| replyFuture = staleReadAsync(request); |
| } else { |
| // first check the server's leader state |
| CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, |
| !request.is(TypeCase.READ) && !request.is(TypeCase.WATCH)); |
| if (reply != null) { |
| return reply; |
| } |
| |
| // let the state machine handle read-only request from client |
| RaftClientRequest.Type type = request.getType(); |
| if (type.is(TypeCase.MESSAGESTREAM)) { |
| if (type.getMessageStream().getEndOfRequest()) { |
| final CompletableFuture<RaftClientRequest> f = streamEndOfRequestAsync(request); |
| if (f.isCompletedExceptionally()) { |
| return f.thenApply(r -> null); |
| } |
| request = f.join(); |
| type = request.getType(); |
| } |
| } |
| |
| if (type.is(TypeCase.READ)) { |
| // TODO: We might not be the leader anymore by the time this completes. |
| // See the RAFT paper section 8 (last part) |
| replyFuture = processQueryFuture(stateMachine.query(request.getMessage()), request); |
| } else if (type.is(TypeCase.WATCH)) { |
| replyFuture = watchAsync(request); |
| } else if (type.is(TypeCase.MESSAGESTREAM)) { |
| replyFuture = streamAsync(request); |
| } else { |
| // query the retry cache |
| final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request)); |
| final CacheEntry cacheEntry = queryResult.getEntry(); |
| if (queryResult.isRetry()) { |
| // if the previous attempt is still pending or it succeeded, return its |
| // future |
| replyFuture = cacheEntry.getReplyFuture(); |
| } else { |
| // TODO: this client request will not be added to pending requests until |
| // later which means that any failure in between will leave partial state in |
| // the state machine. We should call cancelTransaction() for failed requests |
| TransactionContext context = stateMachine.startTransaction(filterDataStreamRaftClientRequest(request)); |
| if (context.getException() != null) { |
| final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); |
| final RaftClientReply exceptionReply = newExceptionReply(request, e); |
| cacheEntry.failWithReply(exceptionReply); |
| replyFuture = CompletableFuture.completedFuture(exceptionReply); |
| } else { |
| replyFuture = appendTransaction(request, context, cacheEntry); |
| } |
| } |
| } |
| } |
| |
| final RaftClientRequest.Type type = request.getType(); |
| replyFuture.whenComplete((clientReply, exception) -> { |
| if (clientReply.isSuccess()) { |
| timer.map(Timer::time).ifPresent(Timer.Context::stop); |
| } |
| if (exception != null || clientReply.getException() != null) { |
| raftServerMetrics.incFailedRequestCount(type); |
| } |
| }); |
| return replyFuture; |
| } |
| |
| private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) { |
| return role.getLeaderState() |
| .map(ls -> ls.addWatchReqeust(request)) |
| .orElseGet(() -> CompletableFuture.completedFuture( |
| newExceptionReply(request, generateNotLeaderException()))); |
| } |
| |
| private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) { |
| final long minIndex = request.getType().getStaleRead().getMinIndex(); |
| final long commitIndex = state.getLog().getLastCommittedIndex(); |
| LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex, commitIndex); |
| if (commitIndex < minIndex) { |
| final StaleReadException e = new StaleReadException( |
| "Unable to serve stale-read due to server commit index = " + commitIndex + " < min = " + minIndex); |
| return CompletableFuture.completedFuture( |
| newExceptionReply(request, new StateMachineException(getMemberId(), e))); |
| } |
| return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request); |
| } |
| |
| private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) { |
| return role.getLeaderState() |
| .map(ls -> ls.streamAsync(request)) |
| .orElseGet(() -> CompletableFuture.completedFuture( |
| newExceptionReply(request, generateNotLeaderException()))); |
| } |
| |
| private CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) { |
| return role.getLeaderState() |
| .map(ls -> ls.streamEndOfRequestAsync(request)) |
| .orElse(null); |
| } |
| |
| CompletableFuture<RaftClientReply> processQueryFuture( |
| CompletableFuture<Message> queryFuture, RaftClientRequest request) { |
| return queryFuture.thenApply(r -> newReplyBuilder(request).setSuccess().setMessage(r).build()) |
| .exceptionally(e -> { |
| e = JavaUtils.unwrapCompletionException(e); |
| if (e instanceof StateMachineException) { |
| return newExceptionReply(request, (StateMachineException)e); |
| } |
| throw new CompletionException(e); |
| }); |
| } |
| |
| @Override |
| public RaftClientReply submitClientRequest(RaftClientRequest request) |
| throws IOException { |
| return waitForReply(request, submitClientRequestAsync(request)); |
| } |
| |
| RaftClientReply waitForReply(RaftClientRequest request, CompletableFuture<RaftClientReply> future) |
| throws IOException { |
| return waitForReply(getMemberId(), request, future, e -> newExceptionReply(request, e)); |
| } |
| |
| static <REPLY extends RaftClientReply> REPLY waitForReply( |
| Object id, RaftClientRequest request, CompletableFuture<REPLY> future, |
| Function<RaftException, REPLY> exceptionReply) |
| throws IOException { |
| try { |
| return future.get(); |
| } catch (InterruptedException e) { |
| final String s = id + ": Interrupted when waiting for reply, request=" + request; |
| LOG.info(s, e); |
| Thread.currentThread().interrupt(); |
| throw IOUtils.toInterruptedIOException(s, e); |
| } catch (ExecutionException e) { |
| final Throwable cause = e.getCause(); |
| if (cause == null) { |
| throw new IOException(e); |
| } |
| if (cause instanceof NotLeaderException || |
| cause instanceof StateMachineException) { |
| final REPLY reply = exceptionReply.apply((RaftException) cause); |
| if (reply != null) { |
| return reply; |
| } |
| } |
| throw IOUtils.asIOException(cause); |
| } |
| } |
| |
| public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException { |
| return waitForReply(request, transferLeadershipAsync(request)); |
| } |
| |
| private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail( |
| TransferLeadershipRequest request, String msg) { |
| LOG.warn(msg); |
| return CompletableFuture.completedFuture( |
| newExceptionReply(request, new TransferLeadershipException(msg))); |
| } |
| |
| boolean isSteppingDown() { |
| return transferLeadership.isSteppingDown(); |
| } |
| |
| void finishTransferLeadership() { |
| transferLeadership.finish(state.getLeaderId(), false); |
| } |
| |
| public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request) |
| throws IOException { |
| LOG.info("{}: receive transferLeadership {}", getMemberId(), request); |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| assertGroup(request.getRequestorId(), request.getRaftGroupId()); |
| |
| synchronized (this) { |
| CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false); |
| if (reply != null) { |
| return reply; |
| } |
| |
| if (getId().equals(request.getNewLeader())) { |
| return CompletableFuture.completedFuture(newSuccessReply(request)); |
| } |
| |
| final RaftConfigurationImpl conf = getRaftConf(); |
| final LeaderStateImpl leaderState = role.getLeaderStateNonNull(); |
| |
| // make sure there is no raft reconfiguration in progress |
| if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) { |
| String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() + |
| " when raft reconfiguration in progress."; |
| return logAndReturnTransferLeadershipFail(request, msg); |
| } |
| |
| if (!conf.containsInConf(request.getNewLeader())) { |
| String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() + |
| " as it is not in " + conf; |
| return logAndReturnTransferLeadershipFail(request, msg); |
| } |
| |
| if (!conf.isHighestPriority(request.getNewLeader())) { |
| String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() + |
| " as it does not has highest priority " + conf; |
| return logAndReturnTransferLeadershipFail(request, msg); |
| } |
| |
| return transferLeadership.start(request); |
| } |
| } |
| |
| CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest request) throws IOException { |
| LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request); |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| assertGroup(request.getRequestorId(), request.getRaftGroupId()); |
| |
| //TODO(liuyaolong): get the gap value from shell command |
| long minGapValue = RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties()); |
| final long lastSnapshotIndex = Optional.ofNullable(stateMachine.getLatestSnapshot()) |
| .map(SnapshotInfo::getIndex) |
| .orElse(0L); |
| if (state.getLastAppliedIndex() - lastSnapshotIndex < minGapValue) { |
| return CompletableFuture.completedFuture(newSuccessReply(request, lastSnapshotIndex)); |
| } |
| |
| synchronized (this) { |
| long installSnapshot = inProgressInstallSnapshotRequest.get(); |
| // check snapshot install/load |
| if (installSnapshot != 0) { |
| String msg = String.format("%s: Failed do snapshot as snapshot (%s) installation is in progress", |
| getMemberId(), installSnapshot); |
| LOG.warn(msg); |
| return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg))); |
| } |
| return snapshotRequestHandler.takingSnapshotAsync(request); |
| } |
| } |
| |
| SnapshotManagementRequestHandler getSnapshotRequestHandler() { |
| return snapshotRequestHandler; |
| } |
| |
| CompletableFuture<RaftClientReply> setLeaderElectionAsync(LeaderElectionRequest request) throws IOException { |
| LOG.info("{} receive pauseLeaderElection {}", getMemberId(), request); |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| assertGroup(request.getRequestorId(), request.getRaftGroupId()); |
| |
| final LeaderElectionRequest.Pause pause = request.getPause(); |
| if (pause != null) { |
| getRole().setLeaderElectionPause(true); |
| return CompletableFuture.completedFuture(newSuccessReply(request)); |
| } |
| final LeaderElectionRequest.Resume resume = request.getResume(); |
| if (resume != null) { |
| getRole().setLeaderElectionPause(false); |
| return CompletableFuture.completedFuture(newSuccessReply(request)); |
| } |
| return JavaUtils.completeExceptionally(new UnsupportedOperationException( |
| getId() + ": Request not supported " + request)); |
| } |
| |
| public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException { |
| return waitForReply(request, setConfigurationAsync(request)); |
| } |
| |
| /** |
| * Handle a raft configuration change request from client. |
| */ |
| public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) throws IOException { |
| LOG.info("{}: receive setConfiguration {}", getMemberId(), request); |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| assertGroup(request.getRequestorId(), request.getRaftGroupId()); |
| |
| CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, true); |
| if (reply != null) { |
| return reply; |
| } |
| |
| final List<RaftPeer> peersInNewConf = request.getPeersInNewConf(); |
| final PendingRequest pending; |
| synchronized (this) { |
| reply = checkLeaderState(request, null, false); |
| if (reply != null) { |
| return reply; |
| } |
| |
| final RaftConfigurationImpl current = getRaftConf(); |
| final LeaderStateImpl leaderState = role.getLeaderStateNonNull(); |
| // make sure there is no other raft reconfiguration in progress |
| if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) { |
| throw new ReconfigurationInProgressException( |
| "Reconfiguration is already in progress: " + current); |
| } |
| |
| // return success with a null message if the new conf is the same as the current |
| if (current.hasNoChange(peersInNewConf)) { |
| pending = new PendingRequest(request); |
| pending.setReply(newSuccessReply(request)); |
| return pending.getFuture(); |
| } |
| |
| getRaftServer().addRaftPeers(peersInNewConf); |
| // add staging state into the leaderState |
| pending = leaderState.startSetConfiguration(request); |
| } |
| return pending.getFuture(); |
| } |
| |
| /** |
| * check if the remote peer is not included in the current conf |
| * and should shutdown. should shutdown if all the following stands: |
| * 1. this is a leader |
| * 2. current conf is stable and has been committed |
| * 3. candidate id is not included in conf |
| * 4. candidate's last entry's index < conf's index |
| */ |
| private boolean shouldSendShutdown(RaftPeerId candidateId, |
| TermIndex candidateLastEntry) { |
| return getInfo().isLeader() |
| && getRaftConf().isStable() |
| && getState().isConfCommitted() |
| && !getRaftConf().containsInConf(candidateId) |
| && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() |
| && role.getLeaderState().map(ls -> !ls.isBootStrappingPeer(candidateId)).orElse(false); |
| } |
| |
| @Override |
| public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException { |
| final RaftRpcRequestProto request = r.getServerRequest(); |
| return requestVote(r.getPreVote() ? Phase.PRE_VOTE : Phase.ELECTION, |
| RaftPeerId.valueOf(request.getRequestorId()), |
| ProtoUtils.toRaftGroupId(request.getRaftGroupId()), |
| r.getCandidateTerm(), |
| TermIndex.valueOf(r.getCandidateLastEntry())); |
| } |
| |
| private RequestVoteReplyProto requestVote(Phase phase, |
| RaftPeerId candidateId, RaftGroupId candidateGroupId, |
| long candidateTerm, TermIndex candidateLastEntry) throws IOException { |
| CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), |
| candidateId, candidateTerm, candidateLastEntry); |
| LOG.info("{}: receive requestVote({}, {}, {}, {}, {})", |
| getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry); |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| assertGroup(candidateId, candidateGroupId); |
| |
| boolean shouldShutdown = false; |
| final RequestVoteReplyProto reply; |
| synchronized (this) { |
| // Check life cycle state again to avoid the PAUSING/PAUSED state. |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| |
| final VoteContext context = new VoteContext(this, phase, candidateId); |
| final RaftPeer candidate = context.recognizeCandidate(candidateTerm); |
| final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); |
| if (candidate != null && phase == Phase.ELECTION) { |
| // change server state in the ELECTION phase |
| final boolean termUpdated = changeToFollower(candidateTerm, true, "candidate:" + candidateId); |
| if (voteGranted) { |
| state.grantVote(candidate.getId()); |
| } |
| if (termUpdated || voteGranted) { |
| state.persistMetadata(); // sync metafile |
| } |
| } |
| if (voteGranted) { |
| role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE)); |
| } else if(shouldSendShutdown(candidateId, candidateLastEntry)) { |
| shouldShutdown = true; |
| } |
| reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(), |
| voteGranted, state.getCurrentTerm(), shouldShutdown); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{} replies to {} vote request: {}. Peer's state: {}", |
| getMemberId(), phase, ServerStringUtils.toRequestVoteReplyString(reply), state); |
| } |
| } |
| return reply; |
| } |
| |
| private void validateEntries(long expectedTerm, TermIndex previous, |
| LogEntryProto... entries) { |
| if (entries != null && entries.length > 0) { |
| final long index0 = entries[0].getIndex(); |
| |
| if (previous == null || previous.getTerm() == 0) { |
| Preconditions.assertTrue(index0 == 0, |
| "Unexpected Index: previous is null but entries[%s].getIndex()=%s", |
| 0, index0); |
| } else { |
| Preconditions.assertTrue(previous.getIndex() == index0 - 1, |
| "Unexpected Index: previous is %s but entries[%s].getIndex()=%s", |
| previous, 0, index0); |
| } |
| |
| for (int i = 0; i < entries.length; i++) { |
| final long t = entries[i].getTerm(); |
| Preconditions.assertTrue(expectedTerm >= t, |
| "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s", |
| i, t, expectedTerm); |
| |
| final long indexi = entries[i].getIndex(); |
| Preconditions.assertTrue(indexi == index0 + i, |
| "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s", |
| i, indexi, index0); |
| } |
| } |
| } |
| |
| @Override |
| public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) |
| throws IOException { |
| try { |
| return appendEntriesAsync(r).join(); |
| } catch (CompletionException e) { |
| throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e)); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto r) |
| throws IOException { |
| // TODO avoid converting list to array |
| final RaftRpcRequestProto request = r.getServerRequest(); |
| final LogEntryProto[] entries = r.getEntriesList() |
| .toArray(new LogEntryProto[r.getEntriesCount()]); |
| final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null; |
| final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId()); |
| |
| preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(), |
| previous, r.getLeaderCommit(), r.getInitializing(), entries); |
| try { |
| return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(), |
| request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries); |
| } catch(Exception t) { |
| LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t); |
| throw t; |
| } |
| } |
| |
| static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) { |
| if (isHeartbeat) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("HEARTBEAT: " + message.get()); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(message.get()); |
| } |
| } |
| } |
| |
| private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType updateType) { |
| final Optional<FollowerState> fs = role.getFollowerState(); |
| if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) { |
| fs.get().updateLastRpcTime(updateType); |
| return fs; |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, |
| TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException { |
| CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), |
| leaderId, leaderTerm, previous, leaderCommit, initializing, entries); |
| |
| assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); |
| if (!startComplete.get()) { |
| throw new ServerNotReadyException(getMemberId() + ": The server role is not yet initialized."); |
| } |
| assertGroup(leaderId, leaderGroupId); |
| |
| try { |
| validateEntries(leaderTerm, previous, entries); |
| } catch (IllegalArgumentException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| private CommitInfoProto updateCommitInfoCache() { |
| return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex()); |
| } |
| |
| @SuppressWarnings("checkstyle:parameternumber") |
| private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( |
| RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing, |
| List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException { |
| final boolean isHeartbeat = entries.length == 0; |
| logAppendEntries(isHeartbeat, |
| () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", " |
| + previous + ", " + leaderCommit + ", " + initializing |
| + ", commits" + ProtoUtils.toString(commitInfos) |
| + ", entries: " + LogProtoUtils.toLogEntriesString(entries)); |
| |
| final long currentTerm; |
| final long followerCommit = state.getLog().getLastCommittedIndex(); |
| final Optional<FollowerState> followerState; |
| Timer.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); |
| synchronized (this) { |
| // Check life cycle state again to avoid the PAUSING/PAUSED state. |
| assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); |
| final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); |
| currentTerm = state.getCurrentTerm(); |
| if (!recognized) { |
| final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( |
| leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(), NOT_LEADER, callId, |
| RaftLog.INVALID_LOG_INDEX, isHeartbeat); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}", |
| getMemberId(), leaderId, leaderTerm, state, ServerStringUtils.toAppendEntriesReplyString(reply)); |
| } |
| return CompletableFuture.completedFuture(reply); |
| } |
| try { |
| changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries"); |
| } catch (IOException e) { |
| return JavaUtils.completeExceptionally(e); |
| } |
| state.setLeader(leaderId, "appendEntries"); |
| |
| if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { |
| role.startFollowerState(this, Op.APPEND_ENTRIES); |
| } |
| followerState = updateLastRpcTime(FollowerState.UpdateType.APPEND_START); |
| |
| // Check that the append entries are not inconsistent. There are 3 |
| // scenarios which can result in inconsistency: |
| // 1. There is a snapshot installation in progress |
| // 2. There is an overlap between the snapshot index and the entries |
| // 3. There is a gap between the local log and the entries |
| // In any of these scenarios, we should return an INCONSISTENCY reply |
| // back to leader so that the leader can update this follower's next index. |
| |
| AppendEntriesReplyProto inconsistencyReply = checkInconsistentAppendEntries( |
| leaderId, currentTerm, followerCommit, previous, callId, isHeartbeat, entries); |
| if (inconsistencyReply != null) { |
| followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); |
| return CompletableFuture.completedFuture(inconsistencyReply); |
| } |
| |
| state.updateConfiguration(entries); |
| } |
| |
| final List<CompletableFuture<Long>> futures = entries.length == 0 ? Collections.emptyList() |
| : state.getLog().append(entries); |
| commitInfos.forEach(commitInfoCache::update); |
| |
| if (!isHeartbeat) { |
| CodeInjectionForTesting.execute(LOG_SYNC, getId(), null); |
| } |
| return JavaUtils.allOf(futures).whenCompleteAsync( |
| (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)) |
| ).thenApply(v -> { |
| final AppendEntriesReplyProto reply; |
| synchronized(this) { |
| final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.length); |
| state.updateCommitIndex(commitIndex, currentTerm, false); |
| updateCommitInfoCache(); |
| final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1; |
| final long matchIndex = entries.length != 0 ? entries[entries.length - 1].getIndex() : |
| RaftLog.INVALID_LOG_INDEX; |
| reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm, |
| state.getLog().getLastCommittedIndex(), n, SUCCESS, callId, matchIndex, |
| isHeartbeat); |
| } |
| logAppendEntries(isHeartbeat, () -> getMemberId() + ": succeeded to handle AppendEntries. Reply: " |
| + ServerStringUtils.toAppendEntriesReplyString(reply)); |
| timer.stop(); // TODO: future never completes exceptionally? |
| return reply; |
| }); |
| } |
| |
| private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm, |
| long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, LogEntryProto... entries) { |
| final long replyNextIndex = checkInconsistentAppendEntries(previous, entries); |
| if (replyNextIndex == -1) { |
| return null; |
| } |
| |
| final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto( |
| leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, INCONSISTENCY, callId, |
| RaftLog.INVALID_LOG_INDEX, isHeartbeat); |
| LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), ServerStringUtils.toAppendEntriesReplyString(reply)); |
| return reply; |
| } |
| |
| private long checkInconsistentAppendEntries(TermIndex previous, LogEntryProto... entries) { |
| // Check if a snapshot installation through state machine is in progress. |
| final long installSnapshot = inProgressInstallSnapshotRequest.get(); |
| if (installSnapshot != 0) { |
| LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in progress", getMemberId(), installSnapshot); |
| return state.getNextIndex(); |
| } |
| |
| // Check that the first log entry is greater than the snapshot index in the latest snapshot and follower's last |
| // committed index. If not, reply to the leader the new next index. |
| if (entries != null && entries.length > 0) { |
| final long firstEntryIndex = entries[0].getIndex(); |
| final long snapshotIndex = state.getSnapshotIndex(); |
| final long commitIndex = state.getLog().getLastCommittedIndex(); |
| final long nextIndex = Math.max(snapshotIndex, commitIndex); |
| if (nextIndex > 0 && nextIndex >= firstEntryIndex) { |
| LOG.info("{}: Failed appendEntries as the first entry (index {})" + |
| " already exists (snapshotIndex: {}, commitIndex: {})", |
| getMemberId(), firstEntryIndex, snapshotIndex, commitIndex); |
| return nextIndex + 1; |
| } |
| } |
| |
| // Check if "previous" is contained in current state. |
| if (previous != null && !state.containsTermIndex(previous)) { |
| final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex()); |
| LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous); |
| return replyNextIndex; |
| } |
| |
| return -1; |
| } |
| |
| @Override |
| public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{}: receive installSnapshot: {}", getMemberId(), |
| ServerStringUtils.toInstallSnapshotRequestString(request)); |
| } |
| final InstallSnapshotReplyProto reply; |
| try { |
| reply = installSnapshotImpl(request); |
| } catch (Exception e) { |
| LOG.error("{}: installSnapshot failed", getMemberId(), e); |
| throw e; |
| } |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{}: reply installSnapshot: {}", getMemberId(), |
| ServerStringUtils.toInstallSnapshotReplyString(reply)); |
| } |
| return reply; |
| } |
| |
| void setLeaderElectionPause(boolean pause) throws ServerNotReadyException { |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| role.setLeaderElectionPause(pause); |
| } |
| |
| boolean pause() { |
| // TODO: should pause() be limited on only working for a follower? |
| |
| // Now the state of lifeCycle should be PAUSING, which will prevent future other operations. |
| // Pause() should pause ongoing operations: |
| // a. call {@link StateMachine#pause()}. |
| synchronized (this) { |
| if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) { |
| return false; |
| } |
| // TODO: any other operations that needs to be paused? |
| stateMachine.pause(); |
| lifeCycle.compareAndTransition(PAUSING, PAUSED); |
| } |
| return true; |
| } |
| |
| boolean resume() throws IOException { |
| synchronized (this) { |
| if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) { |
| return false; |
| } |
| // TODO: any other operations that needs to be resumed? |
| try { |
| stateMachine.reinitialize(); |
| } catch (IOException e) { |
| LOG.warn("Failed to reinitialize statemachine: {}", stateMachine); |
| lifeCycle.compareAndTransition(STARTING, EXCEPTION); |
| throw e; |
| } |
| lifeCycle.compareAndTransition(STARTING, RUNNING); |
| } |
| return true; |
| } |
| |
| @Override |
| public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException { |
| final RaftRpcRequestProto r = request.getServerRequest(); |
| final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId()); |
| final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId()); |
| final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry()); |
| |
| CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request); |
| |
| LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},", |
| getMemberId(), leaderId, request.getLeaderLastEntry()); |
| |
| assertLifeCycleState(LifeCycle.States.RUNNING); |
| assertGroup(leaderId, leaderGroupId); |
| |
| synchronized (this) { |
| // leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry |
| // so leader at each term should has at least one entry |
| if (leaderLastEntry == null) { |
| LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId()); |
| return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); |
| } |
| |
| // Check life cycle state again to avoid the PAUSING/PAUSED state. |
| assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); |
| final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm()); |
| if (!recognized) { |
| LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}", |
| getMemberId(), leaderId, leaderLastEntry.getTerm(), state); |
| return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); |
| } |
| |
| if (!getInfo().isFollower()) { |
| LOG.warn("{} refused StartLeaderElectionRequest from {}, because role is:{}", |
| getMemberId(), leaderId, role.getCurrentRole()); |
| return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); |
| } |
| |
| if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) { |
| LOG.warn("{} refused StartLeaderElectionRequest from {}, because lastEntry:{} less than leaderEntry:{}", |
| getMemberId(), leaderId, leaderLastEntry, state.getLastEntry()); |
| return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false); |
| } |
| |
| changeToCandidate(true); |
| return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true); |
| } |
| } |
| |
| private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException { |
| final RaftRpcRequestProto r = request.getServerRequest(); |
| final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId()); |
| final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId()); |
| CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), |
| leaderId, request); |
| |
| assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); |
| assertGroup(leaderId, leaderGroupId); |
| |
| InstallSnapshotReplyProto reply = null; |
| // Check if install snapshot from Leader is enabled |
| if (installSnapshotEnabled) { |
| // Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot. |
| if (request.hasSnapshotChunk()) { |
| reply = checkAndInstallSnapshot(request, leaderId); |
| } |
| } else { |
| // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot. |
| if (request.hasNotification()) { |
| reply = notifyStateMachineToInstallSnapshot(request, leaderId); |
| } |
| } |
| |
| if (reply != null) { |
| if (request.hasLastRaftConfigurationLogEntryProto()) { |
| // Set the configuration included in the snapshot |
| LogEntryProto newConfLogEntryProto = |
| request.getLastRaftConfigurationLogEntryProto(); |
| LOG.info("{}: set new configuration {} from snapshot", getMemberId(), |
| newConfLogEntryProto); |
| |
| state.setRaftConf(newConfLogEntryProto); |
| state.writeRaftConfiguration(newConfLogEntryProto); |
| stateMachine.event().notifyConfigurationChanged(newConfLogEntryProto.getTerm(), newConfLogEntryProto.getIndex(), |
| newConfLogEntryProto.getConfigurationEntry()); |
| } |
| return reply; |
| } |
| |
| // There is a mismatch between configurations on leader and follower. |
| final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto( |
| leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH); |
| LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}", |
| getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY, |
| leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled); |
| return failedReply; |
| } |
| |
| private InstallSnapshotReplyProto checkAndInstallSnapshot( |
| InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { |
| final long currentTerm; |
| final long leaderTerm = request.getLeaderTerm(); |
| InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); |
| final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); |
| synchronized (this) { |
| final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); |
| currentTerm = state.getCurrentTerm(); |
| if (!recognized) { |
| final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), |
| currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); |
| LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId()); |
| return reply; |
| } |
| changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); |
| state.setLeader(leaderId, "installSnapshot"); |
| |
| updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); |
| try { |
| // Check and append the snapshot chunk. We simply put this in lock |
| // considering a follower peer requiring a snapshot installation does not |
| // have a lot of requests |
| Preconditions.assertTrue( |
| state.getLog().getNextIndex() <= lastIncludedIndex, |
| "%s log's next id is %s, last included index in snapshot is %s", |
| getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex); |
| |
| //TODO: We should only update State with installed snapshot once the request is done. |
| state.installSnapshot(request); |
| |
| // update the committed index |
| // re-load the state machine if this is the last chunk |
| if (snapshotChunkRequest.getDone()) { |
| state.reloadStateMachine(lastIncludedIndex); |
| } |
| } finally { |
| updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE); |
| } |
| } |
| if (snapshotChunkRequest.getDone()) { |
| LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); |
| } |
| return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), |
| currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); |
| } |
| |
| private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( |
| InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { |
| final long currentTerm; |
| final long leaderTerm = request.getLeaderTerm(); |
| final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf( |
| request.getNotification().getFirstAvailableTermIndex()); |
| final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); |
| synchronized (this) { |
| final boolean recognized = state.recognizeLeader(leaderId, leaderTerm); |
| currentTerm = state.getCurrentTerm(); |
| if (!recognized) { |
| final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), |
| currentTerm, InstallSnapshotResult.NOT_LEADER, -1); |
| LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId()); |
| return reply; |
| } |
| changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); |
| state.setLeader(leaderId, "installSnapshot"); |
| long snapshotIndex = state.getSnapshotIndex(); |
| |
| updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); |
| if (inProgressInstallSnapshotRequest.compareAndSet(0, firstAvailableLogIndex)) { |
| LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex); |
| // Check if snapshot index is already at par or ahead of the first |
| // available log index of the Leader. |
| if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0) { |
| // State Machine has already installed the snapshot. Return the |
| // latest snapshot index to the Leader. |
| |
| inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0); |
| LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), |
| InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); |
| return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, |
| InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); |
| } |
| |
| Optional<RaftPeerProto> leaderPeerInfo = null; |
| if (request.hasLastRaftConfigurationLogEntryProto()) { |
| List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry() |
| .getPeersList(); |
| leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst(); |
| Preconditions.assertTrue(leaderPeerInfo.isPresent()); |
| } |
| |
| // For the cases where RaftConf is empty on newly started peer with |
| // empty peer list, we retrieve leader info from |
| // installSnapShotRequestProto. |
| RoleInfoProto roleInfoProto = |
| getRaftConf().getPeer(state.getLeaderId()) == null ? |
| getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) : |
| getRoleInfoProto(); |
| // This is the first installSnapshot notify request for this term and |
| // index. Notify the state machine to install the snapshot. |
| LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", |
| getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); |
| try { |
| stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex) |
| .whenComplete((reply, exception) -> { |
| if (exception != null) { |
| LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", |
| getMemberId(), exception.getMessage()); |
| inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0); |
| return; |
| } |
| |
| if (reply != null) { |
| LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.", |
| getMemberId(), reply.getIndex()); |
| stateMachine.pause(); |
| state.updateInstalledSnapshotIndex(reply); |
| state.reloadStateMachine(reply.getIndex()); |
| installedSnapshotIndex.set(reply.getIndex()); |
| } else { |
| isSnapshotNull.set(true); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: StateMachine could not install snapshot as it is not available", this); |
| } |
| } |
| // wait for 1 seconds for statemachine to install snapshot |
| }).get(1, TimeUnit.SECONDS); |
| } catch (InterruptedException | TimeoutException t) { |
| //nothing to do |
| } catch (Exception t) { |
| // there are two cases: |
| //1 `get()` may throw ExecutionException if `whenComplete` throw an exception |
| //2 when generating completeFuture, `statemachine#notifyInstallSnapshotFromLeader` |
| // may throw an uncertain exception, which is determined by the implementation of |
| // user statemachine. |
| inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0); |
| final String err = getMemberId() + ": Failed to notify StateMachine to InstallSnapshot."; |
| LOG.warn(err + " " + t); |
| throw new IOException(err, t); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId()); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId()); |
| } |
| } |
| |
| // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE. |
| if (isSnapshotNull.compareAndSet(true, false)) { |
| LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(), |
| InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); |
| inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0); |
| return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), |
| currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1); |
| } |
| |
| // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset |
| // installedSnapshotIndex to 0. |
| long latestInstalledSnapshotIndex = this.installedSnapshotIndex.getAndSet(0); |
| if (latestInstalledSnapshotIndex > 0) { |
| LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(), |
| InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotIndex); |
| inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0); |
| return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), |
| currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotIndex); |
| } |
| |
| // Otherwise, Snapshot installation is in progress. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), |
| InstallSnapshotResult.IN_PROGRESS); |
| } |
| return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), |
| currentTerm, InstallSnapshotResult.IN_PROGRESS, -1); |
| } |
| } |
| |
| void submitUpdateCommitEvent() { |
| role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent); |
| } |
| |
| /** |
| * The log has been submitted to the state machine. Use the future to update |
| * the pending requests and retry cache. |
| * @param logEntry the log entry that has been submitted to the state machine |
| * @param stateMachineFuture the future returned by the state machine |
| * from which we will get transaction result later |
| */ |
| private CompletableFuture<Message> replyPendingRequest( |
| LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) { |
| Preconditions.assertTrue(logEntry.hasStateMachineLogEntry()); |
| final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry()); |
| // update the retry cache |
| final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId); |
| if (getInfo().isLeader()) { |
| Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(), |
| "retry cache entry should be pending: %s", cacheEntry); |
| } |
| if (cacheEntry.isFailed()) { |
| retryCache.refreshEntry(new CacheEntry(cacheEntry.getKey())); |
| } |
| |
| final long logIndex = logEntry.getIndex(); |
| return stateMachineFuture.whenComplete((reply, exception) -> { |
| final RaftClientReply.Builder b = newReplyBuilder(invocationId, logIndex); |
| final RaftClientReply r; |
| if (exception == null) { |
| r = b.setSuccess().setMessage(reply).build(); |
| } else { |
| // the exception is coming from the state machine. wrap it into the |
| // reply as a StateMachineException |
| final StateMachineException e = new StateMachineException(getMemberId(), exception); |
| r = b.setException(e).build(); |
| } |
| |
| // update pending request |
| role.getLeaderState().ifPresent(leader -> leader.replyPendingRequest(logIndex, r)); |
| cacheEntry.updateResult(r); |
| }); |
| } |
| |
| CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws RaftLogIOException { |
| if (!next.hasStateMachineLogEntry()) { |
| stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex()); |
| } |
| |
| if (next.hasConfigurationEntry()) { |
| // the reply should have already been set. only need to record |
| // the new conf in the metadata file and notify the StateMachine. |
| state.writeRaftConfiguration(next); |
| stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry()); |
| } else if (next.hasStateMachineLogEntry()) { |
| // check whether there is a TransactionContext because we are the leader. |
| TransactionContext trx = role.getLeaderState() |
| .map(leader -> leader.getTransactionContext(next.getIndex())).orElseGet( |
| () -> TransactionContext.newBuilder() |
| .setServerRole(role.getCurrentRole()) |
| .setStateMachine(stateMachine) |
| .setLogEntry(next) |
| .build()); |
| |
| try { |
| // Let the StateMachine inject logic for committed transactions in sequential order. |
| trx = stateMachine.applyTransactionSerial(trx); |
| |
| final CompletableFuture<Message> stateMachineFuture = stateMachine.applyTransaction(trx); |
| return replyPendingRequest(next, stateMachineFuture); |
| } catch (Exception e) { |
| throw new RaftLogIOException(e); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * The given log entry is being truncated. |
| * Fail the corresponding client request, if there is any. |
| * |
| * @param logEntry the log entry being truncated |
| */ |
| void notifyTruncatedLogEntry(LogEntryProto logEntry) { |
| if (logEntry.hasStateMachineLogEntry()) { |
| final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry()); |
| final CacheEntry cacheEntry = getRetryCache().getIfPresent(invocationId); |
| if (cacheEntry != null) { |
| cacheEntry.failWithReply(newReplyBuilder(invocationId, logEntry.getIndex()) |
| .setException(generateNotLeaderException()) |
| .build()); |
| } |
| } |
| } |
| |
| LeaderElectionMetrics getLeaderElectionMetrics() { |
| return leaderElectionMetrics; |
| } |
| |
| @Override |
| public RaftServerMetricsImpl getRaftServerMetrics() { |
| return raftServerMetrics; |
| } |
| |
| private class RaftServerJmxAdapter extends JmxRegister implements RaftServerMXBean { |
| @Override |
| public String getId() { |
| return getMemberId().getPeerId().toString(); |
| } |
| |
| @Override |
| public String getLeaderId() { |
| RaftPeerId leaderId = getState().getLeaderId(); |
| if (leaderId != null) { |
| return leaderId.toString(); |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public long getCurrentTerm() { |
| return getState().getCurrentTerm(); |
| } |
| |
| @Override |
| public String getGroupId() { |
| return getMemberId().getGroupId().toString(); |
| } |
| |
| @Override |
| public String getRole() { |
| return role.toString(); |
| } |
| |
| @Override |
| public List<String> getFollowers() { |
| return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElse(Collections.emptyList()) |
| .stream().map(RaftPeer::toString).collect(Collectors.toList()); |
| } |
| |
| @Override |
| public List<String> getGroups() { |
| return proxy.getGroupIds().stream().map(RaftGroupId::toString) |
| .collect(Collectors.toList()); |
| } |
| } |
| } |