| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.server.impl; |
| |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; |
| import org.apache.ratis.proto.RaftProtos.CommitInfoProto; |
| import org.apache.ratis.proto.RaftProtos.LogEntryProto; |
| import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; |
| import org.apache.ratis.proto.RaftProtos.RaftPeerRole; |
| import org.apache.ratis.proto.RaftProtos.ReplicationLevel; |
| import org.apache.ratis.proto.RaftProtos.RoleInfoProto; |
| import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto; |
| import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto; |
| import org.apache.ratis.protocol.Message; |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.RaftClientRequest; |
| import org.apache.ratis.protocol.RaftPeer; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.protocol.SetConfigurationRequest; |
| import org.apache.ratis.protocol.TransferLeadershipRequest; |
| import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; |
| import org.apache.ratis.protocol.exceptions.NotLeaderException; |
| import org.apache.ratis.protocol.exceptions.NotReplicatedException; |
| import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.leader.FollowerInfo; |
| import org.apache.ratis.server.leader.LeaderState; |
| import org.apache.ratis.server.leader.LogAppender; |
| import org.apache.ratis.server.metrics.LogAppenderMetrics; |
| import org.apache.ratis.server.metrics.RaftServerMetricsImpl; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.server.raftlog.LogEntryHeader; |
| import org.apache.ratis.server.raftlog.LogProtoUtils; |
| import org.apache.ratis.server.raftlog.RaftLog; |
| import org.apache.ratis.statemachine.TransactionContext; |
| import org.apache.ratis.util.CodeInjectionForTesting; |
| import org.apache.ratis.util.CollectionUtils; |
| import org.apache.ratis.util.Daemon; |
| import org.apache.ratis.util.JavaUtils; |
| import org.apache.ratis.util.Preconditions; |
| import org.apache.ratis.util.TimeDuration; |
| import org.apache.ratis.util.Timestamp; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import java.util.function.LongSupplier; |
| import java.util.function.Predicate; |
| import java.util.function.ToLongFunction; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.ratis.server.RaftServer.Division.LOG; |
| import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY; |
| |
| /** |
| * States for leader only. It contains three different types of processors: |
| * 1. RPC senders: each thread is appending log to a follower |
| * 2. EventProcessor: a single thread updating the raft server's state based on |
| * status of log appending response |
| * 3. PendingRequestHandler: a handler sending back responses to clients when |
| * corresponding log entries are committed |
| */ |
| class LeaderStateImpl implements LeaderState { |
| public static final String APPEND_PLACEHOLDER = JavaUtils.getClassSimpleName(LeaderState.class) + ".placeholder"; |
| |
| private enum BootStrapProgress { |
| NOPROGRESS, PROGRESSING, CAUGHTUP |
| } |
| |
| static class StateUpdateEvent { |
| private enum Type { |
| STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING |
| } |
| |
| private final Type type; |
| private final long newTerm; |
| private final Runnable handler; |
| |
| StateUpdateEvent(Type type, long newTerm, Runnable handler) { |
| this.type = type; |
| this.newTerm = newTerm; |
| this.handler = handler; |
| } |
| |
| void execute() { |
| handler.run(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == this) { |
| return true; |
| } else if (!(obj instanceof StateUpdateEvent)) { |
| return false; |
| } |
| final StateUpdateEvent that = (StateUpdateEvent)obj; |
| return this.type == that.type && this.newTerm == that.newTerm; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(type, newTerm); |
| } |
| |
| @Override |
| public String toString() { |
| return type + (newTerm >= 0? ":" + newTerm: ""); |
| } |
| } |
| |
| private class EventQueue { |
| private final String name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096); |
| |
| void submit(StateUpdateEvent event) { |
| try { |
| queue.put(event); |
| } catch (InterruptedException e) { |
| LOG.info("{}: Interrupted when submitting {} ", this, event); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| StateUpdateEvent poll() { |
| final StateUpdateEvent e; |
| try { |
| e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| String s = this + ": poll() is interrupted"; |
| if (!running) { |
| LOG.info(s + " gracefully"); |
| return null; |
| } else { |
| throw new IllegalStateException(s + " UNEXPECTEDLY", ie); |
| } |
| } |
| |
| if (e != null) { |
| // remove duplicated events from the head. |
| while(e.equals(queue.peek())) { |
| queue.poll(); |
| } |
| } |
| return e; |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| /** |
| * Use {@link CopyOnWriteArrayList} to implement a thread-safe list. |
| * Since each mutation induces a copy of the list, only bulk operations |
| * (addAll and removeAll) are supported. |
| */ |
| static class SenderList { |
| private final List<LogAppender> senders; |
| |
| SenderList() { |
| this.senders = new CopyOnWriteArrayList<>(); |
| } |
| |
| Stream<LogAppender> stream() { |
| return senders.stream(); |
| } |
| |
| List<LogAppender> getSenders() { |
| return senders; |
| } |
| |
| void forEach(Consumer<LogAppender> action) { |
| senders.forEach(action); |
| } |
| |
| void addAll(Collection<LogAppender> newSenders) { |
| if (newSenders.isEmpty()) { |
| return; |
| } |
| |
| Preconditions.assertUnique( |
| CollectionUtils.as(senders, LogAppender::getFollowerId), |
| CollectionUtils.as(newSenders, LogAppender::getFollowerId)); |
| |
| final boolean changed = senders.addAll(newSenders); |
| Preconditions.assertTrue(changed); |
| } |
| |
| boolean removeAll(Collection<LogAppender> c) { |
| return senders.removeAll(c); |
| } |
| } |
| |
| private final StateUpdateEvent updateCommitEvent = |
| new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit); |
| private final StateUpdateEvent checkStagingEvent = |
| new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging); |
| |
| private final String name; |
| private final RaftServerImpl server; |
| private final RaftLog raftLog; |
| private final long currentTerm; |
| private volatile ConfigurationStagingState stagingState; |
| private List<List<RaftPeerId>> voterLists; |
| private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>(); |
| |
| /** |
| * The list of threads appending entries to followers. |
| * The list is protected by the RaftServer's lock. |
| */ |
| private final SenderList senders; |
| private final EventQueue eventQueue; |
| private final EventProcessor processor; |
| private final PendingRequests pendingRequests; |
| private final WatchRequests watchRequests; |
| private final MessageStreamRequests messageStreamRequests; |
| private volatile boolean running = true; |
| |
| private final int stagingCatchupGap; |
| private final long placeHolderIndex; |
| private final RaftServerMetricsImpl raftServerMetrics; |
| private final LogAppenderMetrics logAppenderMetrics; |
| private final long followerMaxGapThreshold; |
| private final PendingStepDown pendingStepDown; |
| |
| LeaderStateImpl(RaftServerImpl server) { |
| this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| this.server = server; |
| |
| final RaftProperties properties = server.getRaftServer().getProperties(); |
| stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties); |
| |
| final ServerState state = server.getState(); |
| this.raftLog = state.getLog(); |
| this.currentTerm = state.getCurrentTerm(); |
| |
| this.eventQueue = new EventQueue(); |
| processor = new EventProcessor(this.name); |
| raftServerMetrics = server.getRaftServerMetrics(); |
| logAppenderMetrics = new LogAppenderMetrics(server.getMemberId()); |
| this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics); |
| this.watchRequests = new WatchRequests(server.getMemberId(), properties); |
| this.messageStreamRequests = new MessageStreamRequests(server.getMemberId()); |
| this.pendingStepDown = new PendingStepDown(this); |
| long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties); |
| double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties); |
| |
| if (followerGapRatioMax == -1) { |
| this.followerMaxGapThreshold = -1; |
| } else if (followerGapRatioMax > 1f || followerGapRatioMax <= 0f) { |
| throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY + |
| "s value must between [1, 0) to enable the feature"); |
| } else { |
| this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); |
| } |
| |
| final RaftConfigurationImpl conf = state.getRaftConf(); |
| Collection<RaftPeer> others = conf.getOtherPeers(server.getId()); |
| placeHolderIndex = raftLog.getNextIndex(); |
| |
| senders = new SenderList(); |
| addSenders(others, placeHolderIndex, true, RaftPeerRole.FOLLOWER); |
| |
| final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER); |
| if (!listeners.isEmpty()) { |
| addSenders(listeners, placeHolderIndex, false, RaftPeerRole.LISTENER); |
| } |
| voterLists = divideFollowers(conf); |
| } |
| |
| LogEntryProto start() { |
| // In the beginning of the new term, replicate a conf entry in order |
| // to finally commit entries in the previous term. |
| // Also this message can help identify the last committed index and the conf. |
| final LogEntryProto placeHolder = LogProtoUtils.toLogEntryProto( |
| server.getRaftConf(), server.getState().getCurrentTerm(), raftLog.getNextIndex()); |
| CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, |
| server.getId().toString(), null); |
| raftLog.append(Collections.singletonList(placeHolder)); |
| processor.start(); |
| senders.forEach(LogAppender::start); |
| return placeHolder; |
| } |
| |
| boolean isReady() { |
| return server.getState().getLastAppliedIndex() >= placeHolderIndex; |
| } |
| |
| void stop() { |
| this.running = false; |
| // do not interrupt event processor since it may be in the middle of logSync |
| senders.forEach(LogAppender::stop); |
| final NotLeaderException nle = server.generateNotLeaderException(); |
| final Collection<CommitInfoProto> commitInfos = server.getCommitInfos(); |
| try { |
| final Collection<TransactionContext> transactions = pendingRequests.sendNotLeaderResponses(nle, commitInfos); |
| server.getStateMachine().leaderEvent().notifyNotLeader(transactions); |
| watchRequests.failWatches(nle); |
| } catch (IOException e) { |
| LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e); |
| } |
| messageStreamRequests.clear(); |
| server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); |
| logAppenderMetrics.unregister(); |
| raftServerMetrics.unregister(); |
| pendingRequests.close(); |
| } |
| |
| void notifySenders() { |
| senders.forEach(LogAppender::notifyLogAppender); |
| } |
| |
| boolean inStagingState() { |
| return stagingState != null; |
| } |
| |
| long getCurrentTerm() { |
| return currentTerm; |
| } |
| |
| @Override |
| public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { |
| if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) { |
| submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Start bootstrapping new peers |
| */ |
| PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) { |
| LOG.info("{}: startSetConfiguration {}", this, request); |
| Preconditions.assertTrue(running && !inStagingState()); |
| |
| final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER); |
| final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf); |
| final Collection<RaftPeer> listenersToBootStrap= server.getRaftConf().filterNotContainedInConf(listenersInNewConf); |
| |
| // add the request to the pending queue |
| final PendingRequest pending = pendingRequests.addConfRequest(request); |
| |
| ConfigurationStagingState configurationStagingState = new ConfigurationStagingState( |
| peersToBootStrap, listenersToBootStrap, new PeerConfiguration(peersInNewConf, listenersInNewConf)); |
| Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers(); |
| Collection<RaftPeer> newListeners = configurationStagingState.getNewListeners(); |
| // set the staging state |
| this.stagingState = configurationStagingState; |
| |
| if (newPeers.isEmpty() && newListeners.isEmpty()) { |
| applyOldNewConf(); |
| } else { |
| // update the LeaderState's sender list |
| addAndStartSenders(newPeers, RaftPeerRole.FOLLOWER); |
| addAndStartSenders(newListeners, RaftPeerRole.LISTENER); |
| } |
| return pending; |
| } |
| |
| PendingRequests.Permit tryAcquirePendingRequest(Message message) { |
| return pendingRequests.tryAcquire(message); |
| } |
| |
| PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: addPendingRequest at {}, entry={}", this, request, |
| LogProtoUtils.toLogEntryString(entry.getLogEntry())); |
| } |
| return pendingRequests.add(permit, request, entry); |
| } |
| |
| CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) { |
| return messageStreamRequests.streamAsync(request) |
| .thenApply(dummy -> server.newSuccessReply(request)) |
| .exceptionally(e -> exception2RaftClientReply(request, e)); |
| } |
| |
| CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) { |
| return messageStreamRequests.streamEndOfRequestAsync(request) |
| .thenApply(bytes -> RaftClientRequest.toWriteRequest(request, Message.valueOf(bytes))); |
| } |
| |
| CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) { |
| LOG.debug("{}: addWatchRequest {}", this, request); |
| return watchRequests.add(request) |
| .thenApply(logIndex -> server.newSuccessReply(request, logIndex)) |
| .exceptionally(e -> exception2RaftClientReply(request, e)); |
| } |
| |
| private RaftClientReply exception2RaftClientReply(RaftClientRequest request, Throwable e) { |
| e = JavaUtils.unwrapCompletionException(e); |
| if (e instanceof NotReplicatedException) { |
| final NotReplicatedException nre = (NotReplicatedException)e; |
| return server.newReplyBuilder(request) |
| .setException(nre) |
| .setLogIndex(nre.getLogIndex()) |
| .build(); |
| } else if (e instanceof NotLeaderException) { |
| return server.newExceptionReply(request, (NotLeaderException)e); |
| } else if (e instanceof LeaderNotReadyException) { |
| return server.newExceptionReply(request, (LeaderNotReadyException)e); |
| } else { |
| throw new CompletionException(e); |
| } |
| } |
| |
| @Override |
| public void onFollowerCommitIndex(FollowerInfo follower, long commitIndex) { |
| if (follower.updateCommitIndex(commitIndex)) { |
| commitIndexChanged(); |
| } |
| } |
| |
| private void commitIndexChanged() { |
| getMajorityMin(FollowerInfo::getCommitIndex, raftLog::getLastCommittedIndex).ifPresent(m -> { |
| // Normally, leader commit index is always ahead of followers. |
| // However, after a leader change, the new leader commit index may |
| // be behind some followers in the beginning. |
| watchRequests.update(ReplicationLevel.ALL_COMMITTED, m.min); |
| watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, m.majority); |
| watchRequests.update(ReplicationLevel.MAJORITY, m.max); |
| }); |
| notifySenders(); |
| } |
| |
| private void applyOldNewConf() { |
| final ServerState state = server.getState(); |
| final RaftConfigurationImpl current = state.getRaftConf(); |
| final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex()); |
| // apply the (old, new) configuration to log, and use it as the current conf |
| long index = state.getLog().append(state.getCurrentTerm(), oldNewConf); |
| updateConfiguration(index, oldNewConf); |
| |
| this.stagingState = null; |
| notifySenders(); |
| } |
| |
| private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) { |
| Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex()); |
| voterLists = divideFollowers(newConf); |
| server.getState().setRaftConf(newConf); |
| } |
| |
| void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) { |
| for (LogAppender sender : senders.getSenders()) { |
| FollowerInfo info = sender.getFollower(); |
| protos.add(cache.update(info.getPeer(), info.getCommitIndex())); |
| } |
| } |
| |
| @Override |
| public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower, |
| List<LogEntryProto> entries, TermIndex previous, long callId) { |
| final boolean initializing = isAttendingVote(follower); |
| final RaftPeerId targetId = follower.getPeer().getId(); |
| return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries, |
| ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()), |
| initializing, previous, server.getCommitInfos(), callId); |
| } |
| |
| /** |
| * Update sender list for setConfiguration request |
| */ |
| void addAndStartSenders(Collection<RaftPeer> newPeers, RaftPeerRole role) { |
| if (!newPeers.isEmpty()) { |
| addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false, role).forEach(LogAppender::start); |
| } |
| } |
| |
| Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote, |
| RaftPeerRole role) { |
| final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); |
| final List<LogAppender> newAppenders = newPeers.stream() |
| .map(peer -> { |
| final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote); |
| peerIdFollowerInfoMap.put(peer.getId(), f); |
| if (role == RaftPeerRole.FOLLOWER) { |
| raftServerMetrics.addFollower(peer.getId()); |
| logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime); |
| } |
| return server.newLogAppender(this, f); |
| }).collect(Collectors.toList()); |
| senders.addAll(newAppenders); |
| return newAppenders; |
| } |
| |
| void stopAndRemoveSenders(Predicate<LogAppender> predicate) { |
| final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList()); |
| toStop.forEach(LogAppender::stop); |
| senders.removeAll(toStop); |
| } |
| |
| @Override |
| public void restart(LogAppender sender) { |
| final FollowerInfo info = sender.getFollower(); |
| LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName()); |
| sender.stop(); |
| senders.removeAll(Collections.singleton(sender)); |
| |
| final RaftPeer peer = info.getPeer(); |
| if (server.getRaftConf().containsInConf(peer.getId())) { |
| addAndStartSenders(Collections.singleton(peer), RaftPeerRole.FOLLOWER); |
| } else if (server.getRaftConf().containsInConf(peer.getId(), RaftPeerRole.LISTENER)) { |
| addAndStartSenders(Collections.singleton(peer), RaftPeerRole.LISTENER); |
| } |
| } |
| |
| /** |
| * Update the RpcSender list based on the current configuration |
| */ |
| private void updateSenders(RaftConfigurationImpl conf) { |
| Preconditions.assertTrue(conf.isStable() && !inStagingState()); |
| stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)); |
| } |
| |
| void submitStepDownEvent(StepDownReason reason) { |
| submitStepDownEvent(getCurrentTerm(), reason); |
| } |
| |
| void submitStepDownEvent(long term, StepDownReason reason) { |
| eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term, reason))); |
| } |
| |
| private void stepDown(long term, StepDownReason reason) { |
| try { |
| server.changeToFollowerAndPersistMetadata(term, false, reason); |
| pendingStepDown.complete(server::newSuccessReply); |
| } catch(IOException e) { |
| final String s = this + ": Failed to persist metadata for term " + term; |
| LOG.warn(s, e); |
| // the failure should happen while changing the state to follower |
| // thus the in-memory state should have been updated |
| if (running) { |
| throw new IllegalStateException(s + " and running == true", e); |
| } |
| } |
| } |
| |
| CompletableFuture<RaftClientReply> submitStepDownRequestAsync(TransferLeadershipRequest request) { |
| return pendingStepDown.submitAsync(request); |
| } |
| |
| private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId follower, TermIndex lastEntry) { |
| ServerState state = server.getState(); |
| TermIndex currLastEntry = state.getLastEntry(); |
| if (ServerState.compareLog(currLastEntry, lastEntry) != 0) { |
| LOG.warn("{} can not send StartLeaderElectionRequest to follower:{} because currLastEntry:{} " + |
| "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry); |
| return; |
| } |
| |
| final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto( |
| server.getMemberId(), follower, lastEntry); |
| CompletableFuture.supplyAsync(() -> { |
| server.getLeaderElectionMetrics().onTransferLeadership(); |
| try { |
| StartLeaderElectionReplyProto replyProto = server.getServerRpc().startLeaderElection(r); |
| LOG.info("{} received {} reply of StartLeaderElectionRequest from follower:{}", |
| this, replyProto.getServerReply().getSuccess() ? "success" : "fail", follower); |
| } catch (IOException e) { |
| LOG.warn("{} send StartLeaderElectionRequest throw exception", this, e); |
| } |
| return null; |
| }); |
| } |
| |
| private void prepare() { |
| synchronized (server) { |
| if (running) { |
| final ServerState state = server.getState(); |
| if (state.getRaftConf().isTransitional() && state.isConfCommitted()) { |
| // the configuration is in transitional state, and has been committed |
| // so it is time to generate and replicate (new) conf. |
| replicateNewConf(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * The processor thread takes the responsibility to update the raft server's |
| * state, such as changing to follower, or updating the committed index. |
| */ |
| private class EventProcessor extends Daemon { |
| public EventProcessor(String name) { |
| setName(name); |
| } |
| @Override |
| public void run() { |
| // apply an empty message; check if necessary to replicate (new) conf |
| prepare(); |
| |
| while (running) { |
| final StateUpdateEvent event = eventQueue.poll(); |
| synchronized(server) { |
| if (running) { |
| if (event != null) { |
| event.execute(); |
| } else if (inStagingState()) { |
| checkStaging(); |
| } else { |
| yieldLeaderToHigherPriorityPeer(); |
| checkLeadership(); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * So far we use a simple implementation for catchup checking: |
| * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, |
| * the peer made no progress for that long. We should fail the whole |
| * setConfiguration request. |
| * 2. If the peer's matching index is just behind for a small gap, and the |
| * peer was updated recently (within max_timeout), declare the peer as |
| * caught-up. |
| * 3. Otherwise the peer is making progressing. Keep waiting. |
| */ |
| private BootStrapProgress checkProgress(FollowerInfo follower, long committed) { |
| Preconditions.assertTrue(!isAttendingVote(follower)); |
| final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); |
| final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs()); |
| if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { |
| LOG.debug("{} detects a follower {} timeout ({}ms) for bootstrapping", this, follower, |
| follower.getLastRpcResponseTime().elapsedTimeMs()); |
| return BootStrapProgress.NOPROGRESS; |
| } else if (follower.getMatchIndex() + stagingCatchupGap > committed |
| && follower.getLastRpcResponseTime().compareTo(progressTime) > 0 |
| && follower.hasAttemptedToInstallSnapshot()) { |
| return BootStrapProgress.CAUGHTUP; |
| } else { |
| return BootStrapProgress.PROGRESSING; |
| } |
| } |
| |
| private Collection<BootStrapProgress> checkAllProgress(long committed) { |
| Preconditions.assertTrue(inStagingState()); |
| return senders.stream() |
| .filter(sender -> !isAttendingVote(sender.getFollower())) |
| .map(sender -> checkProgress(sender.getFollower(), committed)) |
| .collect(Collectors.toCollection(ArrayList::new)); |
| } |
| |
| @Override |
| public void onFollowerSuccessAppendEntries(FollowerInfo follower) { |
| if (isAttendingVote(follower)) { |
| submitUpdateCommitEvent(); |
| } else { |
| eventQueue.submit(checkStagingEvent); |
| } |
| } |
| |
| @Override |
| public boolean isFollowerBootstrapping(FollowerInfo follower) { |
| return isBootStrappingPeer(follower.getPeer().getId()); |
| } |
| |
| private void checkStaging() { |
| if (!inStagingState()) { |
| // it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT |
| updateCommitEvent.execute(); |
| } else { |
| final long committedIndex = server.getState().getLog() |
| .getLastCommittedIndex(); |
| Collection<BootStrapProgress> reports = checkAllProgress(committedIndex); |
| if (reports.contains(BootStrapProgress.NOPROGRESS)) { |
| stagingState.fail(BootStrapProgress.NOPROGRESS); |
| } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { |
| // all caught up! |
| applyOldNewConf(); |
| senders.stream() |
| .map(LogAppender::getFollower) |
| .filter(f -> server.getRaftConf().containsInConf(f.getPeer().getId())) |
| .map(FollowerInfoImpl.class::cast) |
| .forEach(FollowerInfoImpl::startAttendVote); |
| } |
| } |
| } |
| |
| boolean isBootStrappingPeer(RaftPeerId peerId) { |
| return Optional.ofNullable(stagingState).map(s -> s.contains(peerId)).orElse(false); |
| } |
| |
| void submitUpdateCommitEvent() { |
| eventQueue.submit(updateCommitEvent); |
| } |
| |
| static class MinMajorityMax { |
| private final long min; |
| private final long majority; |
| private final long max; |
| |
| MinMajorityMax(long min, long majority, long max) { |
| this.min = min; |
| this.majority = majority; |
| this.max = max; |
| } |
| |
| MinMajorityMax combine(MinMajorityMax that) { |
| return new MinMajorityMax( |
| Math.min(this.min, that.min), |
| Math.min(this.majority, that.majority), |
| Math.min(this.max, that.max)); |
| } |
| |
| static MinMajorityMax valueOf(long[] sorted) { |
| return new MinMajorityMax(sorted[0], getMajority(sorted), getMax(sorted)); |
| } |
| |
| static MinMajorityMax valueOf(long[] sorted, long gapThreshold) { |
| long majority = getMajority(sorted); |
| long min = sorted[0]; |
| if (gapThreshold != -1 && (majority - min) > gapThreshold) { |
| // The the gap between majority and min(the slow follower) is greater than gapThreshold, |
| // set the majority to min, which will skip one round of lastCommittedIndex update in updateCommit(). |
| majority = min; |
| } |
| return new MinMajorityMax(min, majority, getMax(sorted)); |
| } |
| |
| static long getMajority(long[] sorted) { |
| return sorted[(sorted.length - 1) / 2]; |
| } |
| |
| static long getMax(long[] sorted) { |
| return sorted[sorted.length - 1]; |
| } |
| } |
| |
| private void updateCommit() { |
| getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getFlushIndex, |
| followerMaxGapThreshold) |
| .ifPresent(m -> updateCommit(m.majority, m.min)); |
| } |
| |
| private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> followerIndex, LongSupplier logIndex) { |
| return getMajorityMin(followerIndex, logIndex, -1); |
| } |
| |
| private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> followerIndex, |
| LongSupplier logIndex, long gapThreshold) { |
| final RaftPeerId selfId = server.getId(); |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| |
| final List<RaftPeerId> followers = voterLists.get(0); |
| final boolean includeSelf = conf.containsInConf(selfId); |
| if (followers.isEmpty() && !includeSelf) { |
| return Optional.empty(); |
| } |
| |
| final long[] indicesInNewConf = getSorted(followers, includeSelf, followerIndex, logIndex); |
| final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf, gapThreshold); |
| |
| if (!conf.isTransitional()) { |
| return Optional.of(newConf); |
| } else { // configuration is in transitional state |
| final List<RaftPeerId> oldFollowers = voterLists.get(1); |
| final boolean includeSelfInOldConf = conf.containsInOldConf(selfId); |
| if (oldFollowers.isEmpty() && !includeSelfInOldConf) { |
| return Optional.empty(); |
| } |
| |
| final long[] indicesInOldConf = getSorted(oldFollowers, includeSelfInOldConf, followerIndex, logIndex); |
| final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf, followerMaxGapThreshold); |
| return Optional.of(newConf.combine(oldConf)); |
| } |
| } |
| |
| private void updateCommit(LogEntryHeader[] entriesToCommit) { |
| final long newCommitIndex = raftLog.getLastCommittedIndex(); |
| logMetadata(newCommitIndex); |
| commitIndexChanged(); |
| |
| boolean hasConfiguration = false; |
| for (LogEntryHeader entry : entriesToCommit) { |
| if (entry.getIndex() > newCommitIndex) { |
| break; |
| } |
| hasConfiguration |= entry.getLogEntryBodyCase() == LogEntryBodyCase.CONFIGURATIONENTRY; |
| raftLog.getRaftLogMetrics().onLogEntryCommitted(entry); |
| } |
| if (hasConfiguration) { |
| checkAndUpdateConfiguration(); |
| } |
| } |
| |
| private void updateCommit(long majority, long min) { |
| final long oldLastCommitted = raftLog.getLastCommittedIndex(); |
| if (majority > oldLastCommitted) { |
| // Get the headers before updating commit index since the log can be purged after a snapshot |
| final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1); |
| |
| if (server.getState().updateCommitIndex(majority, currentTerm, true)) { |
| updateCommit(entriesToCommit); |
| } |
| } |
| watchRequests.update(ReplicationLevel.ALL, min); |
| } |
| |
| private void logMetadata(long commitIndex) { |
| raftLog.appendMetadata(currentTerm, commitIndex); |
| notifySenders(); |
| } |
| |
| private void checkAndUpdateConfiguration() { |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| if (conf.isTransitional()) { |
| replicateNewConf(); |
| } else { // the (new) log entry has been committed |
| pendingRequests.replySetConfiguration(server::newSuccessReply); |
| // if the leader is not included in the current configuration, step down |
| if (!conf.containsInConf(server.getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) { |
| LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf); |
| try { |
| // leave some time for all RPC senders to send out new conf entry |
| server.properties().minRpcTimeout().sleep(); |
| } catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| // the pending request handler will send NotLeaderException for |
| // pending client requests when it stops |
| server.close(); |
| } |
| } |
| } |
| |
| /** |
| * when the (old, new) log entry has been committed, should replicate (new): |
| * 1) append (new) to log |
| * 2) update conf to (new) |
| * 3) update RpcSenders list |
| * 4) start replicating the log entry |
| */ |
| private void replicateNewConf() { |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| final RaftConfigurationImpl newConf = RaftConfigurationImpl.newBuilder() |
| .setConf(conf) |
| .setLogEntryIndex(raftLog.getNextIndex()) |
| .build(); |
| // stop the LogAppender if the corresponding follower and listener is no longer in the conf |
| updateSenders(newConf); |
| long index = raftLog.append(server.getState().getCurrentTerm(), newConf); |
| updateConfiguration(index, newConf); |
| notifySenders(); |
| } |
| |
| private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) { |
| List<FollowerInfo> followerInfos = new ArrayList<>(); |
| for (int i = 0; i < followerIDs.size(); i++) { |
| RaftPeerId id = followerIDs.get(i); |
| if (!peerIdFollowerInfoMap.containsKey(id)) { |
| throw new IllegalArgumentException("RaftPeerId:" + id + |
| " not in peerIdFollowerInfoMap of leader:" + server.getMemberId()); |
| } |
| |
| followerInfos.add(peerIdFollowerInfoMap.get(id)); |
| } |
| |
| return followerInfos; |
| } |
| |
| private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf, |
| ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) { |
| final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size(); |
| if (length == 0) { |
| throw new IllegalArgumentException("followers.size() == " |
| + followerIDs.size() + " and includeSelf == " + includeSelf); |
| } |
| |
| final long[] indices = new long[length]; |
| List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs); |
| for (int i = 0; i < followerInfos.size(); i++) { |
| indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i)); |
| } |
| |
| if (includeSelf) { |
| // note that we also need to wait for the local disk I/O |
| indices[length - 1] = getLogIndex.getAsLong(); |
| } |
| |
| Arrays.sort(indices); |
| return indices; |
| } |
| |
| private List<List<RaftPeerId>> divideFollowers(RaftConfigurationImpl conf) { |
| List<List<RaftPeerId>> lists = new ArrayList<>(2); |
| List<RaftPeerId> listForNew = senders.stream() |
| .map(LogAppender::getFollowerId) |
| .filter(conf::containsInConf) |
| .collect(Collectors.toList()); |
| lists.add(listForNew); |
| if (conf.isTransitional()) { |
| List<RaftPeerId> listForOld = senders.stream() |
| .map(LogAppender::getFollowerId) |
| .filter(conf::containsInOldConf) |
| .collect(Collectors.toList()); |
| lists.add(listForOld); |
| } |
| return lists; |
| } |
| |
| private void yieldLeaderToHigherPriorityPeer() { |
| if (!server.getInfo().isLeader()) { |
| return; |
| } |
| |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| final RaftPeer leader = conf.getPeer(server.getId()); |
| if (leader == null) { |
| LOG.error("{} the leader {} is not in the conf {}", this, server.getId(), conf); |
| return; |
| } |
| int leaderPriority = leader.getPriority(); |
| |
| for (LogAppender logAppender : senders.getSenders()) { |
| final FollowerInfo followerInfo = logAppender.getFollower(); |
| final RaftPeerId followerID = followerInfo.getPeer().getId(); |
| final RaftPeer follower = conf.getPeer(followerID); |
| if (follower == null) { |
| if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) { |
| LOG.error("{} the follower {} is not in the conf {}", this, followerID, conf); |
| } |
| continue; |
| } |
| final int followerPriority = follower.getPriority(); |
| if (followerPriority <= leaderPriority) { |
| continue; |
| } |
| final TermIndex leaderLastEntry = server.getState().getLastEntry(); |
| if (leaderLastEntry == null) { |
| LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " + |
| "is higher than leader's:{} and leader's lastEntry is null", |
| this, followerID, currentTerm, followerPriority, leaderPriority); |
| |
| sendStartLeaderElectionToHigherPriorityPeer(followerID, null); |
| return; |
| } |
| |
| if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) { |
| LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " + |
| "is higher than leader's:{} and follower's lastEntry index:{} catch up with leader's:{}", |
| this, followerID, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(), |
| leaderLastEntry.getIndex()); |
| sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry); |
| return; |
| } |
| } |
| } |
| |
| /** |
| * See the thesis section 6.2: A leader in Raft steps down |
| * if an election timeout elapses without a successful |
| * round of heartbeats to a majority of its cluster. |
| */ |
| |
| public boolean checkLeadership() { |
| if (!server.getInfo().isLeader()) { |
| return false; |
| } |
| // The initial value of lastRpcResponseTime in FollowerInfo is set by |
| // LeaderState::addSenders(), which is fake and used to trigger an |
| // immediate round of AppendEntries request. Since candidates collect |
| // votes from majority before becoming leader, without seeing higher term, |
| // ideally, A leader is legal for election timeout if become leader soon. |
| if (server.getRole().getRoleElapsedTimeMs() < server.getMaxTimeoutMs()) { |
| return true; |
| } |
| |
| final List<RaftPeerId> activePeers = senders.stream() |
| .filter(sender -> sender.getFollower() |
| .getLastRpcResponseTime() |
| .elapsedTimeMs() <= server.getMaxTimeoutMs()) |
| .map(LogAppender::getFollowerId) |
| .collect(Collectors.toList()); |
| |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| |
| if (conf.hasMajority(activePeers, server.getId())) { |
| // leadership check passed |
| return true; |
| } |
| |
| LOG.warn(this + ": Lost leadership on term: " + currentTerm |
| + ". Election timeout: " + server.getMaxTimeoutMs() + "ms" |
| + ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms" |
| + ". Conf: " + conf); |
| senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f)); |
| |
| // step down as follower |
| stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS); |
| return false; |
| } |
| |
| void replyPendingRequest(long logIndex, RaftClientReply reply) { |
| pendingRequests.replyPendingRequest(logIndex, reply); |
| } |
| |
| TransactionContext getTransactionContext(long index) { |
| return pendingRequests.getTransactionContext(index); |
| } |
| |
| long[] getFollowerNextIndices() { |
| return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray(); |
| } |
| |
| static Map<RaftPeerId, RaftPeer> newMap(Collection<RaftPeer> peers, String str) { |
| Objects.requireNonNull(peers, () -> str + " == null"); |
| final Map<RaftPeerId, RaftPeer> map = new HashMap<>(); |
| for(RaftPeer p : peers) { |
| map.put(p.getId(), p); |
| } |
| return Collections.unmodifiableMap(map); |
| } |
| |
| private class ConfigurationStagingState { |
| private final String name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| private final Map<RaftPeerId, RaftPeer> newPeers; |
| private final Map<RaftPeerId, RaftPeer> newListeners; |
| private final PeerConfiguration newConf; |
| |
| ConfigurationStagingState(Collection<RaftPeer> newPeers, Collection<RaftPeer> newListeners, |
| PeerConfiguration newConf) { |
| this.newPeers = newMap(newPeers, "peer"); |
| this.newListeners = newMap(newListeners, "listeners"); |
| this.newConf = newConf; |
| } |
| |
| RaftConfigurationImpl generateOldNewConf(RaftConfigurationImpl current, long logIndex) { |
| return RaftConfigurationImpl.newBuilder() |
| .setConf(newConf) |
| .setOldConf(current) |
| .setLogEntryIndex(logIndex) |
| .build(); |
| } |
| |
| Collection<RaftPeer> getNewPeers() { |
| return newPeers.values(); |
| } |
| |
| Collection<RaftPeer> getNewListeners() { |
| return newListeners.values(); |
| } |
| |
| boolean contains(RaftPeerId peerId) { |
| return newPeers.containsKey(peerId) || newListeners.containsKey(peerId); |
| } |
| |
| void fail(BootStrapProgress progress) { |
| final String message = this + ": Fail to set configuration " + newConf + " due to " + progress; |
| LOG.debug(message); |
| stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower())); |
| |
| stagingState = null; |
| // send back failure response to client's request |
| pendingRequests.failSetConfiguration(new ReconfigurationTimeoutException(message)); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| /** |
| * @return the RaftPeer (address and id) information of the followers. |
| */ |
| List<RaftPeer> getFollowers() { |
| return Collections.unmodifiableList(senders.stream() |
| .map(sender -> sender.getFollower().getPeer()) |
| .filter(peer -> server.getRaftConf().containsInConf(peer.getId())) |
| .collect(Collectors.toList())); |
| } |
| |
| Stream<LogAppender> getLogAppenders() { |
| return senders.stream(); |
| } |
| |
| private static boolean isAttendingVote(FollowerInfo follower) { |
| return ((FollowerInfoImpl)follower).isAttendingVote(); |
| } |
| |
| @Override |
| public void checkHealth(FollowerInfo follower) { |
| final TimeDuration elapsedTime = follower.getLastRpcResponseTime().elapsedTime(); |
| if (elapsedTime.compareTo(server.properties().rpcSlownessTimeout()) > 0) { |
| final RoleInfoProto leaderInfo = server.getInfo().getRoleInfoProto(); |
| server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo); |
| server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo, follower.getPeer()); |
| } |
| final RaftPeerId followerId = follower.getPeer().getId(); |
| raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS)); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |