| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.server.impl; |
| |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.proto.RaftProtos; |
| import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; |
| import org.apache.ratis.proto.RaftProtos.CommitInfoProto; |
| import org.apache.ratis.proto.RaftProtos.LogEntryProto; |
| import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; |
| import org.apache.ratis.proto.RaftProtos.RaftPeerRole; |
| import org.apache.ratis.proto.RaftProtos.ReplicationLevel; |
| import org.apache.ratis.proto.RaftProtos.RoleInfoProto; |
| import org.apache.ratis.protocol.Message; |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.RaftClientRequest; |
| import org.apache.ratis.protocol.RaftPeer; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.protocol.SetConfigurationRequest; |
| import org.apache.ratis.protocol.TransferLeadershipRequest; |
| import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; |
| import org.apache.ratis.protocol.exceptions.NotLeaderException; |
| import org.apache.ratis.protocol.exceptions.NotReplicatedException; |
| import org.apache.ratis.protocol.exceptions.ReadIndexException; |
| import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener; |
| import org.apache.ratis.server.leader.FollowerInfo; |
| import org.apache.ratis.server.leader.LeaderState; |
| import org.apache.ratis.server.leader.LogAppender; |
| import org.apache.ratis.server.metrics.LogAppenderMetrics; |
| import org.apache.ratis.server.metrics.RaftServerMetricsImpl; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.server.raftlog.LogEntryHeader; |
| import org.apache.ratis.server.raftlog.LogProtoUtils; |
| import org.apache.ratis.server.raftlog.RaftLog; |
| import org.apache.ratis.statemachine.TransactionContext; |
| import org.apache.ratis.util.CodeInjectionForTesting; |
| import org.apache.ratis.util.CollectionUtils; |
| import org.apache.ratis.util.Daemon; |
| import org.apache.ratis.util.JavaUtils; |
| import org.apache.ratis.util.MemoizedSupplier; |
| import org.apache.ratis.util.Preconditions; |
| import org.apache.ratis.util.ReferenceCountedObject; |
| import org.apache.ratis.util.TimeDuration; |
| import org.apache.ratis.util.Timestamp; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.LongSupplier; |
| import java.util.function.Predicate; |
| import java.util.function.ToLongFunction; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import java.util.stream.StreamSupport; |
| |
| import static org.apache.ratis.server.RaftServer.Division.LOG; |
| import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY; |
| |
| /** |
| * States for leader only. It contains three different types of processors: |
| * 1. RPC senders: each thread is appending log to a follower |
| * 2. EventProcessor: a single thread updating the raft server's state based on |
| * status of log appending response |
| * 3. PendingRequestHandler: a handler sending back responses to clients when |
| * corresponding log entries are committed |
| */ |
| class LeaderStateImpl implements LeaderState { |
| public static final String APPEND_PLACEHOLDER = JavaUtils.getClassSimpleName(LeaderState.class) + ".placeholder"; |
| |
| private enum BootStrapProgress { |
| NOPROGRESS, PROGRESSING, CAUGHTUP |
| } |
| |
| static class StateUpdateEvent { |
| private enum Type { |
| STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING |
| } |
| |
| private final Type type; |
| private final long newTerm; |
| private final Runnable handler; |
| |
| StateUpdateEvent(Type type, long newTerm, Runnable handler) { |
| this.type = type; |
| this.newTerm = newTerm; |
| this.handler = handler; |
| } |
| |
| void execute() { |
| handler.run(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == this) { |
| return true; |
| } else if (!(obj instanceof StateUpdateEvent)) { |
| return false; |
| } |
| final StateUpdateEvent that = (StateUpdateEvent)obj; |
| return this.type == that.type && this.newTerm == that.newTerm; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(type, newTerm); |
| } |
| |
| @Override |
| public String toString() { |
| return type + (newTerm >= 0? ":" + newTerm: ""); |
| } |
| } |
| |
| private class EventQueue { |
| private final String name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| private final BlockingQueue<StateUpdateEvent> queue = new ArrayBlockingQueue<>(4096); |
| |
| void submit(StateUpdateEvent event) { |
| try { |
| queue.put(event); |
| } catch (InterruptedException e) { |
| LOG.info("{}: Interrupted when submitting {} ", this, event); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| StateUpdateEvent poll() { |
| final StateUpdateEvent e; |
| try { |
| e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| String s = this + ": poll() is interrupted"; |
| if (isStopped.get()) { |
| LOG.info(s + " gracefully"); |
| return null; |
| } else { |
| throw new IllegalStateException(s + " UNEXPECTEDLY", ie); |
| } |
| } |
| |
| if (e != null) { |
| // remove duplicated events from the head. |
| while(e.equals(queue.peek())) { |
| queue.poll(); |
| } |
| } |
| return e; |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| /** |
| * Use {@link CopyOnWriteArrayList} to implement a thread-safe list. |
| * Since each mutation induces a copy of the list, only bulk operations |
| * (addAll and removeAll) are supported. |
| */ |
| static class SenderList implements Iterable<LogAppender> { |
| private final List<LogAppender> senders; |
| |
| SenderList() { |
| this.senders = new CopyOnWriteArrayList<>(); |
| } |
| |
| @Override |
| public Iterator<LogAppender> iterator() { |
| return senders.iterator(); |
| } |
| |
| void addAll(Collection<LogAppender> newSenders) { |
| if (newSenders.isEmpty()) { |
| return; |
| } |
| |
| Preconditions.assertUnique( |
| CollectionUtils.as(senders, LogAppender::getFollowerId), |
| CollectionUtils.as(newSenders, LogAppender::getFollowerId)); |
| |
| final boolean changed = senders.addAll(newSenders); |
| Preconditions.assertTrue(changed); |
| } |
| |
| boolean removeAll(Collection<LogAppender> c) { |
| return senders.removeAll(c); |
| } |
| |
| CompletableFuture<Void> stopAll() { |
| final CompletableFuture<?>[] futures = new CompletableFuture<?>[senders.size()]; |
| for(int i = 0; i < futures.length; i++) { |
| futures[i] = senders.get(i).stopAsync(); |
| } |
| return CompletableFuture.allOf(futures); |
| } |
| } |
| |
| /** For caching {@link FollowerInfo}s. This class is immutable. */ |
| static class CurrentOldFollowerInfos { |
| private final RaftConfigurationImpl conf; |
| private final List<FollowerInfo> current; |
| private final List<FollowerInfo> old; |
| |
| CurrentOldFollowerInfos(RaftConfigurationImpl conf, List<FollowerInfo> current, List<FollowerInfo> old) { |
| // set null when the sizes are not the same so that it will update next time. |
| this.conf = isSameSize(current, conf.getConf()) && isSameSize(old, conf.getOldConf())? conf: null; |
| this.current = Collections.unmodifiableList(current); |
| this.old = old == null? null: Collections.unmodifiableList(old); |
| } |
| |
| RaftConfigurationImpl getConf() { |
| return conf; |
| } |
| |
| List<FollowerInfo> getCurrent() { |
| return current; |
| } |
| |
| List<FollowerInfo> getOld() { |
| return old; |
| } |
| } |
| |
| static boolean isSameSize(List<FollowerInfo> infos, PeerConfiguration conf) { |
| return conf == null? infos == null: conf.size() == infos.size(); |
| } |
| |
| /** Use == to compare if the confs are the same object. */ |
| static boolean isSameConf(CurrentOldFollowerInfos cached, RaftConfigurationImpl conf) { |
| return cached != null && cached.getConf() == conf; |
| } |
| |
| static class FollowerInfoMap { |
| private final Map<RaftPeerId, FollowerInfo> map = new ConcurrentHashMap<>(); |
| |
| private volatile CurrentOldFollowerInfos followerInfos; |
| |
| void put(RaftPeerId id, FollowerInfo info) { |
| map.put(id, info); |
| } |
| |
| CurrentOldFollowerInfos getFollowerInfos(RaftConfigurationImpl conf) { |
| final CurrentOldFollowerInfos cached = followerInfos; |
| if (isSameConf(cached, conf)) { |
| return cached; |
| } |
| |
| return update(conf); |
| } |
| |
| synchronized CurrentOldFollowerInfos update(RaftConfigurationImpl conf) { |
| if (!isSameConf(followerInfos, conf)) { // compare again synchronized |
| followerInfos = new CurrentOldFollowerInfos(conf, getFollowerInfos(conf.getConf()), |
| Optional.ofNullable(conf.getOldConf()).map(this::getFollowerInfos).orElse(null)); |
| } |
| return followerInfos; |
| } |
| |
| private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) { |
| return peers.streamPeerIds().map(map::get).filter(Objects::nonNull).collect(Collectors.toList()); |
| } |
| } |
| |
| private class StartupLogEntry { |
| /** The log index at leader startup. */ |
| private final long startIndex = appendConfiguration(RaftConfigurationImpl.newBuilder() |
| .setConf(server.getRaftConf().getConf()) |
| .setLogEntryIndex(raftLog.getNextIndex()) |
| .build()); |
| /** This future will be completed after the log entry is applied. */ |
| private final CompletableFuture<Long> appliedIndexFuture = new CompletableFuture<>(); |
| |
| CompletableFuture<Long> getAppliedIndexFuture() { |
| return appliedIndexFuture; |
| } |
| |
| boolean isApplied(LogEntryProto logEntry) { |
| if (appliedIndexFuture.isDone()) { |
| return true; |
| } |
| final long appliedIndex = logEntry != null? logEntry.getIndex(): server.getState().getLastAppliedIndex(); |
| if (appliedIndex >= startIndex) { |
| appliedIndexFuture.complete(appliedIndex); |
| LOG.info("leader is ready since appliedIndex == {} >= startIndex == {}", |
| appliedIndex, startIndex); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| private final StateUpdateEvent updateCommitEvent = |
| new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit); |
| private final StateUpdateEvent checkStagingEvent = |
| new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1, this::checkStaging); |
| |
| private final String name; |
| private final RaftServerImpl server; |
| private final RaftLog raftLog; |
| private final long currentTerm; |
| private volatile ConfigurationStagingState stagingState; |
| |
| private final FollowerInfoMap followerInfoMap = new FollowerInfoMap(); |
| |
| /** |
| * The list of threads appending entries to followers. |
| * The list is protected by the RaftServer's lock. |
| */ |
| private final SenderList senders; |
| private final EventQueue eventQueue; |
| private final EventProcessor processor; |
| private final PendingRequests pendingRequests; |
| private final WatchRequests watchRequests; |
| private final MessageStreamRequests messageStreamRequests; |
| |
| private final MemoizedSupplier<StartupLogEntry> startupLogEntry = MemoizedSupplier.valueOf(StartupLogEntry::new); |
| private final AtomicBoolean isStopped = new AtomicBoolean(); |
| |
| private final int stagingCatchupGap; |
| private final RaftServerMetricsImpl raftServerMetrics; |
| private final LogAppenderMetrics logAppenderMetrics; |
| private final long followerMaxGapThreshold; |
| private final PendingStepDown pendingStepDown; |
| |
| private final ReadIndexHeartbeats readIndexHeartbeats; |
| private final LeaderLease lease; |
| |
| LeaderStateImpl(RaftServerImpl server) { |
| this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| this.server = server; |
| |
| final RaftProperties properties = server.getRaftServer().getProperties(); |
| stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties); |
| |
| final ServerState state = server.getState(); |
| this.raftLog = state.getLog(); |
| this.currentTerm = state.getCurrentTerm(); |
| |
| this.eventQueue = new EventQueue(); |
| processor = new EventProcessor(this.name, server); |
| raftServerMetrics = server.getRaftServerMetrics(); |
| logAppenderMetrics = new LogAppenderMetrics(server.getMemberId()); |
| this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics); |
| this.watchRequests = new WatchRequests(server.getMemberId(), properties, raftServerMetrics); |
| this.messageStreamRequests = new MessageStreamRequests(server.getMemberId()); |
| this.pendingStepDown = new PendingStepDown(this); |
| this.readIndexHeartbeats = new ReadIndexHeartbeats(); |
| this.lease = new LeaderLease(properties); |
| long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties); |
| double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties); |
| |
| if (followerGapRatioMax == -1) { |
| this.followerMaxGapThreshold = -1; |
| } else if (followerGapRatioMax > 1f || followerGapRatioMax <= 0f) { |
| throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY + |
| "s value must between [1, 0) to enable the feature"); |
| } else { |
| this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); |
| } |
| |
| final RaftConfigurationImpl conf = state.getRaftConf(); |
| Collection<RaftPeer> others = conf.getOtherPeers(server.getId()); |
| |
| final long nextIndex = raftLog.getNextIndex(); |
| senders = new SenderList(); |
| addSenders(others, nextIndex, true); |
| |
| final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER); |
| if (!listeners.isEmpty()) { |
| addSenders(listeners, nextIndex, true); |
| } |
| } |
| |
| void start() { |
| // In the beginning of the new term, replicate a conf entry in order |
| // to finally commit entries in the previous term. |
| // Also this message can help identify the last committed index and the conf. |
| CodeInjectionForTesting.execute(APPEND_PLACEHOLDER, |
| server.getId().toString(), null); |
| // Initialize startup log entry and append it to the RaftLog |
| startupLogEntry.get(); |
| processor.start(); |
| senders.forEach(LogAppender::start); |
| } |
| |
| boolean isReady() { |
| return startupLogEntry.isInitialized() && startupLogEntry.get().isApplied(null); |
| } |
| |
| void checkReady(LogEntryProto entry) { |
| Preconditions.assertTrue(startupLogEntry.isInitialized()); |
| if (entry.getTerm() == getCurrentTerm() && startupLogEntry.get().isApplied(entry)) { |
| server.getStateMachine().leaderEvent().notifyLeaderReady(); |
| } |
| } |
| |
| CompletableFuture<Void> stop() { |
| if (!isStopped.compareAndSet(false, true)) { |
| LOG.info("{} is already stopped", this); |
| return CompletableFuture.completedFuture(null); |
| } |
| // do not interrupt event processor since it may be in the middle of logSync |
| final CompletableFuture<Void> f = senders.stopAll(); |
| final NotLeaderException nle = server.generateNotLeaderException(); |
| final Collection<CommitInfoProto> commitInfos = server.getCommitInfos(); |
| try { |
| final Collection<TransactionContext> transactions = pendingRequests.sendNotLeaderResponses(nle, commitInfos); |
| server.getStateMachine().leaderEvent().notifyNotLeader(transactions); |
| watchRequests.failWatches(nle); |
| } catch (IOException e) { |
| LOG.warn("{}: Caught exception in sendNotLeaderResponses", this, e); |
| } |
| messageStreamRequests.clear(); |
| readIndexHeartbeats.failListeners(nle); |
| lease.getAndSetEnabled(false); |
| startupLogEntry.get().getAppliedIndexFuture().completeExceptionally( |
| new ReadIndexException("failed to obtain read index since: ", nle)); |
| server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); |
| logAppenderMetrics.unregister(); |
| raftServerMetrics.unregister(); |
| pendingRequests.close(); |
| watchRequests.close(); |
| return f; |
| } |
| |
| void notifySenders() { |
| senders.forEach(LogAppender::notifyLogAppender); |
| } |
| |
| boolean inStagingState() { |
| return stagingState != null; |
| } |
| |
| long getCurrentTerm() { |
| Preconditions.assertSame(currentTerm, server.getState().getCurrentTerm(), "currentTerm"); |
| return currentTerm; |
| } |
| |
| @Override |
| public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { |
| if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) { |
| submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Start bootstrapping new peers |
| */ |
| PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) { |
| LOG.info("{}: startSetConfiguration {}", this, request); |
| Preconditions.assertTrue(isRunning(), () -> this + " is not running."); |
| Preconditions.assertTrue(!inStagingState(), () -> this + " is already in staging state " + stagingState); |
| |
| final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER); |
| final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf); |
| final Collection<RaftPeer> listenersToBootStrap= server.getRaftConf().filterNotContainedInConf(listenersInNewConf); |
| |
| // add the request to the pending queue |
| final PendingRequest pending = pendingRequests.addConfRequest(request); |
| |
| ConfigurationStagingState configurationStagingState = new ConfigurationStagingState( |
| peersToBootStrap, listenersToBootStrap, new PeerConfiguration(peersInNewConf, listenersInNewConf)); |
| Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers(); |
| Collection<RaftPeer> newListeners = configurationStagingState.getNewListeners(); |
| Collection<RaftPeer> allNew = newListeners.isEmpty() |
| ? newPeers |
| : newPeers.isEmpty() |
| ? newListeners |
| : Stream.concat(newPeers.stream(), newListeners.stream()) |
| .collect(Collectors.toList()); |
| |
| if (allNew.isEmpty()) { |
| applyOldNewConf(configurationStagingState); |
| } else { |
| // update the LeaderState's sender list |
| Collection<LogAppender> newAppenders = addSenders(allNew); |
| |
| // set the staging state |
| stagingState = configurationStagingState; |
| |
| newAppenders.forEach(LogAppender::start); |
| } |
| |
| return pending; |
| } |
| |
| PendingRequests.Permit tryAcquirePendingRequest(Message message) { |
| return pendingRequests.tryAcquire(message); |
| } |
| |
| PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: addPendingRequest at {}, entry={}", this, request, |
| LogProtoUtils.toLogEntryString(entry.getLogEntry())); |
| } |
| return pendingRequests.add(permit, request, entry); |
| } |
| |
| CompletableFuture<RaftClientReply> streamAsync(ReferenceCountedObject<RaftClientRequest> requestRef) { |
| RaftClientRequest request = requestRef.get(); |
| return messageStreamRequests.streamAsync(requestRef) |
| .thenApply(dummy -> server.newSuccessReply(request)) |
| .exceptionally(e -> exception2RaftClientReply(request, e)); |
| } |
| |
| CompletableFuture<ReferenceCountedObject<RaftClientRequest>> streamEndOfRequestAsync( |
| ReferenceCountedObject<RaftClientRequest> requestRef) { |
| return messageStreamRequests.streamEndOfRequestAsync(requestRef); |
| } |
| |
| CompletableFuture<RaftClientReply> addWatchRequest(RaftClientRequest request) { |
| LOG.debug("{}: addWatchRequest {}", this, request); |
| return watchRequests.add(request) |
| .thenApply(logIndex -> server.newSuccessReply(request, logIndex)) |
| .exceptionally(e -> exception2RaftClientReply(request, e)); |
| } |
| |
| private RaftClientReply exception2RaftClientReply(RaftClientRequest request, Throwable e) { |
| e = JavaUtils.unwrapCompletionException(e); |
| if (e instanceof NotReplicatedException) { |
| final NotReplicatedException nre = (NotReplicatedException)e; |
| return server.newReplyBuilder(request) |
| .setException(nre) |
| .setLogIndex(nre.getLogIndex()) |
| .build(); |
| } else if (e instanceof NotLeaderException) { |
| return server.newExceptionReply(request, (NotLeaderException)e); |
| } else if (e instanceof LeaderNotReadyException) { |
| return server.newExceptionReply(request, (LeaderNotReadyException)e); |
| } else { |
| throw new CompletionException(e); |
| } |
| } |
| |
| @Override |
| public void onFollowerCommitIndex(FollowerInfo follower, long commitIndex) { |
| if (follower.updateCommitIndex(commitIndex)) { |
| commitIndexChanged(); |
| } |
| } |
| |
| private void commitIndexChanged() { |
| getMajorityMin(FollowerInfo::getCommitIndex, raftLog::getLastCommittedIndex).ifPresent(m -> { |
| // Normally, leader commit index is always ahead of followers. |
| // However, after a leader change, the new leader commit index may |
| // be behind some followers in the beginning. |
| watchRequests.update(ReplicationLevel.ALL_COMMITTED, m.min); |
| watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, m.majority); |
| watchRequests.update(ReplicationLevel.MAJORITY, m.max); |
| }); |
| notifySenders(); |
| } |
| |
| private void applyOldNewConf(ConfigurationStagingState stage) { |
| final ServerState state = server.getState(); |
| final RaftConfigurationImpl current = state.getRaftConf(); |
| final long nextIndex = state.getLog().getNextIndex(); |
| final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current, nextIndex); |
| // apply the (old, new) configuration to log, and use it as the current conf |
| appendConfiguration(oldNewConf); |
| |
| notifySenders(); |
| } |
| |
| private long appendConfiguration(RaftConfigurationImpl conf) { |
| final long logIndex = raftLog.append(getCurrentTerm(), conf); |
| Preconditions.assertSame(conf.getLogEntryIndex(), logIndex, "confLogIndex"); |
| server.getState().setRaftConf(conf); |
| return logIndex; |
| } |
| |
| void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) { |
| for (LogAppender sender : senders) { |
| FollowerInfo info = sender.getFollower(); |
| protos.add(cache.update(info.getPeer(), info.getCommitIndex())); |
| } |
| } |
| |
| @Override |
| public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower, |
| List<LogEntryProto> entries, TermIndex previous, long callId) { |
| final boolean initializing = !isCaughtUp(follower); |
| final RaftPeerId targetId = follower.getId(); |
| return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries, |
| ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()), |
| initializing, previous, server.getCommitInfos(), callId); |
| } |
| |
| /** |
| * Update sender list for setConfiguration request |
| */ |
| private void addAndStartSenders(Collection<RaftPeer> newPeers) { |
| addSenders(newPeers).forEach(LogAppender::start); |
| } |
| |
| private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers) { |
| return !newPeers.isEmpty() |
| ? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false) |
| : Collections.emptyList(); |
| } |
| |
| private RaftPeer getPeer(RaftPeerId id) { |
| return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER); |
| } |
| |
| private LogAppender newLogAppender(FollowerInfo f) { |
| return server.getRaftServer().getFactory().newLogAppender(server, this, f); |
| } |
| |
| private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean caughtUp) { |
| final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); |
| final List<LogAppender> newAppenders = newPeers.stream().map(peer -> { |
| final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, caughtUp); |
| followerInfoMap.put(peer.getId(), f); |
| raftServerMetrics.addFollower(peer.getId()); |
| logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime); |
| return newLogAppender(f); |
| }).collect(Collectors.toList()); |
| senders.addAll(newAppenders); |
| return newAppenders; |
| } |
| |
| private void stopAndRemoveSenders(Predicate<LogAppender> predicate) { |
| stopAndRemoveSenders(getLogAppenders().filter(predicate).collect(Collectors.toList())); |
| } |
| |
| private void stopAndRemoveSenders(Collection<LogAppender> toStop) { |
| toStop.forEach(LogAppender::stopAsync); |
| senders.removeAll(toStop); |
| } |
| |
| boolean isRunning() { |
| if (isStopped.get()) { |
| return false; |
| } |
| final LeaderStateImpl current = server.getRole().getLeaderState().orElse(null); |
| return this == current; |
| } |
| |
| @Override |
| public void restart(LogAppender sender) { |
| if (!isRunning()) { |
| LOG.warn("Failed to restart {}: {} is not running", sender, this); |
| return; |
| } |
| |
| final FollowerInfo info = sender.getFollower(); |
| LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName()); |
| stopAndRemoveSenders(Collections.singleton(sender)); |
| |
| Optional.ofNullable(getPeer(info.getId())) |
| .ifPresent(peer -> addAndStartSenders(Collections.singleton(peer))); |
| } |
| |
| /** |
| * Update the RpcSender list based on the current configuration |
| */ |
| private void updateSenders(RaftConfigurationImpl conf) { |
| Preconditions.assertTrue(conf.isStable() && !inStagingState()); |
| stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)); |
| } |
| |
| void submitStepDownEvent(StepDownReason reason) { |
| submitStepDownEvent(getCurrentTerm(), reason); |
| } |
| |
| void submitStepDownEvent(long term, StepDownReason reason) { |
| eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, term, () -> stepDown(term, reason))); |
| } |
| |
| private void stepDown(long term, StepDownReason reason) { |
| try { |
| lease.getAndSetEnabled(false); |
| server.changeToFollowerAndPersistMetadata(term, false, reason); |
| pendingStepDown.complete(server::newSuccessReply); |
| } catch(IOException e) { |
| final String s = this + ": Failed to persist metadata for term " + term; |
| LOG.warn(s, e); |
| // the failure should happen while changing the state to follower |
| // thus the in-memory state should have been updated |
| if (!isStopped.get()) { |
| throw new IllegalStateException(s + " and running == true", e); |
| } |
| } |
| } |
| |
| CompletableFuture<RaftClientReply> submitStepDownRequestAsync(TransferLeadershipRequest request) { |
| return pendingStepDown.submitAsync(request); |
| } |
| |
| private static LogAppender chooseUpToDateFollower(List<LogAppender> followers, TermIndex leaderLastEntry) { |
| for(LogAppender f : followers) { |
| if (TransferLeadership.isFollowerUpToDate(f.getFollower(), leaderLastEntry) |
| == TransferLeadership.Result.SUCCESS) { |
| return f; |
| } |
| } |
| return null; |
| } |
| |
| private void prepare() { |
| synchronized (server) { |
| if (isRunning()) { |
| final ServerState state = server.getState(); |
| if (state.getRaftConf().isTransitional() && state.isConfCommitted()) { |
| // the configuration is in transitional state, and has been committed |
| // so it is time to generate and replicate (new) conf. |
| replicateNewConf(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * The processor thread takes the responsibility to update the raft server's |
| * state, such as changing to follower, or updating the committed index. |
| */ |
| private class EventProcessor extends Daemon { |
| public EventProcessor(String name, RaftServerImpl server) { |
| super(Daemon.newBuilder() |
| .setName(name).setThreadGroup(server.getThreadGroup())); |
| } |
| @Override |
| public void run() { |
| // apply an empty message; check if necessary to replicate (new) conf |
| prepare(); |
| |
| while (isRunning()) { |
| final StateUpdateEvent event = eventQueue.poll(); |
| synchronized(server) { |
| if (isRunning()) { |
| if (event != null) { |
| event.execute(); |
| } else if (inStagingState()) { |
| checkStaging(); |
| } else if (checkLeadership()) { |
| checkPeersForYieldingLeader(); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * So far we use a simple implementation for catchup checking: |
| * 1. If the latest rpc time of the remote peer is before 3 * max_timeout, |
| * the peer made no progress for that long. We should fail the whole |
| * setConfiguration request. |
| * 2. If the peer's matching index is just behind for a small gap, and the |
| * peer was updated recently (within max_timeout), declare the peer as |
| * caught-up. |
| * 3. Otherwise the peer is making progressing. Keep waiting. |
| */ |
| private BootStrapProgress checkProgress(FollowerInfo follower, long committed) { |
| Preconditions.assertTrue(!isCaughtUp(follower)); |
| final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); |
| final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs()); |
| if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { |
| LOG.debug("{} detects a follower {} timeout ({}ms) for bootstrapping", this, follower, |
| follower.getLastRpcResponseTime().elapsedTimeMs()); |
| return BootStrapProgress.NOPROGRESS; |
| } else if (follower.getMatchIndex() + stagingCatchupGap > committed |
| && follower.getLastRpcResponseTime().compareTo(progressTime) > 0 |
| && follower.hasAttemptedToInstallSnapshot()) { |
| return BootStrapProgress.CAUGHTUP; |
| } else { |
| return BootStrapProgress.PROGRESSING; |
| } |
| } |
| |
| @Override |
| public void onFollowerSuccessAppendEntries(FollowerInfo follower) { |
| if (isCaughtUp(follower)) { |
| submitUpdateCommitEvent(); |
| } else { |
| eventQueue.submit(checkStagingEvent); |
| } |
| server.getTransferLeadership().onFollowerAppendEntriesReply(follower); |
| } |
| |
| @Override |
| public boolean isFollowerBootstrapping(FollowerInfo follower) { |
| return isBootStrappingPeer(follower.getId()); |
| } |
| |
| private void checkStaging() { |
| if (!inStagingState()) { |
| // it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT |
| updateCommitEvent.execute(); |
| } else { |
| final long commitIndex = server.getState().getLog().getLastCommittedIndex(); |
| // check progress for the new followers |
| final List<FollowerInfoImpl> laggingFollowers = getLogAppenders() |
| .map(LogAppender::getFollower) |
| .filter(follower -> !isCaughtUp(follower)) |
| .map(FollowerInfoImpl.class::cast) |
| .collect(Collectors.toList()); |
| final EnumSet<BootStrapProgress> reports = laggingFollowers.stream() |
| .map(follower -> checkProgress(follower, commitIndex)) |
| .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class))); |
| if (reports.contains(BootStrapProgress.NOPROGRESS)) { |
| stagingState.fail(BootStrapProgress.NOPROGRESS); |
| } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { |
| // all caught up! |
| applyOldNewConf(stagingState); |
| this.stagingState = null; |
| laggingFollowers.stream() |
| .filter(f -> server.getRaftConf().containsInConf(f.getId())) |
| .forEach(FollowerInfoImpl::catchUp); |
| } |
| } |
| } |
| |
| boolean isBootStrappingPeer(RaftPeerId peerId) { |
| return Optional.ofNullable(stagingState).map(s -> s.contains(peerId)).orElse(false); |
| } |
| |
| void submitUpdateCommitEvent() { |
| eventQueue.submit(updateCommitEvent); |
| } |
| |
| static class MinMajorityMax { |
| private final long min; |
| private final long majority; |
| private final long max; |
| |
| MinMajorityMax(long min, long majority, long max) { |
| this.min = min; |
| this.majority = majority; |
| this.max = max; |
| } |
| |
| MinMajorityMax combine(MinMajorityMax that) { |
| return new MinMajorityMax( |
| Math.min(this.min, that.min), |
| Math.min(this.majority, that.majority), |
| Math.min(this.max, that.max)); |
| } |
| |
| static MinMajorityMax valueOf(long[] sorted) { |
| return new MinMajorityMax(sorted[0], getMajority(sorted), getMax(sorted)); |
| } |
| |
| static MinMajorityMax valueOf(long[] sorted, long gapThreshold) { |
| long majority = getMajority(sorted); |
| long min = sorted[0]; |
| if (gapThreshold != -1 && (majority - min) > gapThreshold) { |
| // The the gap between majority and min(the slow follower) is greater than gapThreshold, |
| // set the majority to min, which will skip one round of lastCommittedIndex update in updateCommit(). |
| majority = min; |
| } |
| return new MinMajorityMax(min, majority, getMax(sorted)); |
| } |
| |
| static long getMajority(long[] sorted) { |
| return sorted[(sorted.length - 1) / 2]; |
| } |
| |
| static long getMax(long[] sorted) { |
| return sorted[sorted.length - 1]; |
| } |
| } |
| |
| private void updateCommit() { |
| getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getFlushIndex, |
| followerMaxGapThreshold) |
| .ifPresent(m -> updateCommit(m.majority, m.min)); |
| } |
| |
| private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> followerIndex, LongSupplier logIndex) { |
| return getMajorityMin(followerIndex, logIndex, -1); |
| } |
| |
| private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> followerIndex, |
| LongSupplier logIndex, long gapThreshold) { |
| final RaftPeerId selfId = server.getId(); |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| |
| final CurrentOldFollowerInfos infos = followerInfoMap.getFollowerInfos(conf); |
| final List<FollowerInfo> followers = infos.getCurrent(); |
| final boolean includeSelf = conf.containsInConf(selfId); |
| if (followers.isEmpty() && !includeSelf) { |
| return Optional.empty(); |
| } |
| |
| final long[] indicesInNewConf = getSorted(followers, includeSelf, followerIndex, logIndex); |
| final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf, gapThreshold); |
| |
| if (!conf.isTransitional()) { |
| return Optional.of(newConf); |
| } else { // configuration is in transitional state |
| final List<FollowerInfo> oldFollowers = infos.getOld(); |
| final boolean includeSelfInOldConf = conf.containsInOldConf(selfId); |
| if (oldFollowers.isEmpty() && !includeSelfInOldConf) { |
| return Optional.empty(); |
| } |
| |
| final long[] indicesInOldConf = getSorted(oldFollowers, includeSelfInOldConf, followerIndex, logIndex); |
| final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf, gapThreshold); |
| return Optional.of(newConf.combine(oldConf)); |
| } |
| } |
| |
| private boolean hasMajority(Predicate<RaftPeerId> isAcked) { |
| final RaftPeerId selfId = server.getId(); |
| return server.getRaftConf().hasMajority(isAcked, selfId); |
| } |
| |
| private void updateCommit(LogEntryHeader[] entriesToCommit) { |
| final long newCommitIndex = raftLog.getLastCommittedIndex(); |
| logMetadata(newCommitIndex); |
| commitIndexChanged(); |
| |
| boolean hasConfiguration = false; |
| for (LogEntryHeader entry : entriesToCommit) { |
| if (entry.getIndex() > newCommitIndex) { |
| break; |
| } |
| hasConfiguration |= entry.getLogEntryBodyCase() == LogEntryBodyCase.CONFIGURATIONENTRY; |
| raftLog.getRaftLogMetrics().onLogEntryCommitted(entry); |
| } |
| if (hasConfiguration) { |
| checkAndUpdateConfiguration(); |
| } |
| } |
| |
| private void updateCommit(long majority, long min) { |
| final long oldLastCommitted = raftLog.getLastCommittedIndex(); |
| if (majority > oldLastCommitted) { |
| // Get the headers before updating commit index since the log can be purged after a snapshot |
| final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1); |
| |
| if (server.getState().updateCommitIndex(majority, currentTerm, true)) { |
| updateCommit(entriesToCommit); |
| } |
| } |
| watchRequests.update(ReplicationLevel.ALL, min); |
| } |
| |
| private void logMetadata(long commitIndex) { |
| raftLog.appendMetadata(currentTerm, commitIndex); |
| notifySenders(); |
| } |
| |
| private void checkAndUpdateConfiguration() { |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| if (conf.isTransitional()) { |
| replicateNewConf(); |
| } else { // the (new) log entry has been committed |
| pendingRequests.replySetConfiguration(server::newSuccessReply); |
| // if the leader is not included in the current configuration, step down |
| if (!conf.containsInConf(server.getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) { |
| lease.getAndSetEnabled(false); |
| LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf); |
| try { |
| // leave some time for all RPC senders to send out new conf entry |
| server.properties().minRpcTimeout().sleep(); |
| } catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| // the pending request handler will send NotLeaderException for |
| // pending client requests when it stops |
| server.close(); |
| } |
| } |
| } |
| |
| /** |
| * when the (old, new) log entry has been committed, should replicate (new): |
| * 1) append (new) to log |
| * 2) update conf to (new) |
| * 3) update RpcSenders list |
| * 4) start replicating the log entry |
| */ |
| private void replicateNewConf() { |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| final RaftConfigurationImpl newConf = RaftConfigurationImpl.newBuilder() |
| .setConf(conf) |
| .setLogEntryIndex(raftLog.getNextIndex()) |
| .build(); |
| // stop the LogAppender if the corresponding follower and listener is no longer in the conf |
| updateSenders(newConf); |
| appendConfiguration(newConf); |
| notifySenders(); |
| } |
| |
| private long[] getSorted(List<FollowerInfo> followerInfos, boolean includeSelf, |
| ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) { |
| final int length = includeSelf ? followerInfos.size() + 1 : followerInfos.size(); |
| if (length == 0) { |
| throw new IllegalArgumentException("followerInfos is empty and includeSelf == " + includeSelf); |
| } |
| |
| final long[] indices = new long[length]; |
| for (int i = 0; i < followerInfos.size(); i++) { |
| indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i)); |
| } |
| |
| if (includeSelf) { |
| // note that we also need to wait for the local disk I/O |
| indices[length - 1] = getLogIndex.getAsLong(); |
| } |
| |
| Arrays.sort(indices); |
| return indices; |
| } |
| |
| private void checkPeersForYieldingLeader() { |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| final RaftPeer leader = conf.getPeer(server.getId()); |
| if (leader == null) { |
| LOG.error("{} the leader {} is not in the conf {}", this, server.getId(), conf); |
| return; |
| } |
| final int leaderPriority = leader.getPriority(); |
| |
| final List<LogAppender> highestPriorityInfos = new ArrayList<>(); |
| int highestPriority = Integer.MIN_VALUE; |
| for (LogAppender logAppender : senders) { |
| final RaftPeer follower = conf.getPeer(logAppender.getFollowerId()); |
| if (follower == null) { |
| continue; |
| } |
| final int followerPriority = follower.getPriority(); |
| if (followerPriority > leaderPriority && followerPriority >= highestPriority) { |
| if (followerPriority > highestPriority) { |
| highestPriority = followerPriority; |
| highestPriorityInfos.clear(); |
| } |
| highestPriorityInfos.add(logAppender); |
| } |
| } |
| final TermIndex leaderLastEntry = server.getState().getLastEntry(); |
| final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry); |
| if (appender != null) { |
| server.getTransferLeadership().start(appender); |
| } |
| } |
| |
| /** |
| * See the thesis section 6.2: A leader in Raft steps down |
| * if an election timeout elapses without a successful |
| * round of heartbeats to a majority of its cluster. |
| */ |
| |
| public boolean checkLeadership() { |
| if (!server.getRole().getLeaderState().filter(leader -> leader == this).isPresent()) { |
| return false; |
| } |
| // The initial value of lastRpcResponseTime in FollowerInfo is set by |
| // LeaderState::addSenders(), which is fake and used to trigger an |
| // immediate round of AppendEntries request. Since candidates collect |
| // votes from majority before becoming leader, without seeing higher term, |
| // ideally, A leader is legal for election timeout if become leader soon. |
| if (server.getRole().getRoleElapsedTimeMs() < server.getMaxTimeoutMs()) { |
| return true; |
| } |
| |
| final List<RaftPeerId> activePeers = getLogAppenders() |
| .filter(sender -> sender.getFollower() |
| .getLastRpcResponseTime() |
| .elapsedTimeMs() <= server.getMaxTimeoutMs()) |
| .map(LogAppender::getFollowerId) |
| .collect(Collectors.toList()); |
| |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| |
| if (conf.hasMajority(activePeers, server.getId())) { |
| // leadership check passed |
| return true; |
| } |
| |
| LOG.warn(this + ": Lost leadership on term: " + currentTerm |
| + ". Election timeout: " + server.getMaxTimeoutMs() + "ms" |
| + ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms" |
| + ". Conf: " + conf); |
| getLogAppenders().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f)); |
| |
| // step down as follower |
| stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS); |
| return false; |
| } |
| |
| /** |
| * Obtain the current readIndex for read only requests. See Raft paper section 6.4. |
| * 1. Leader makes sure at least one log from current term is committed. |
| * 2. Leader record last committed index as readIndex. |
| * 3. Leader broadcast heartbeats to followers and waits for acknowledgements. |
| * 4. If majority respond success, returns readIndex. |
| * @return current readIndex. |
| */ |
| CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) { |
| final long readIndex; |
| if (readAfterWriteConsistentIndex != null) { |
| readIndex = readAfterWriteConsistentIndex; |
| } else { |
| readIndex = server.getRaftLog().getLastCommittedIndex(); |
| } |
| LOG.debug("readIndex={}, readAfterWriteConsistentIndex={}", readIndex, readAfterWriteConsistentIndex); |
| |
| // if group contains only one member, fast path |
| if (server.getRaftConf().isSingleton()) { |
| return CompletableFuture.completedFuture(readIndex); |
| } |
| |
| // leader has not committed any entries in this term |
| if (!isReady()) { |
| return startupLogEntry.get().getAppliedIndexFuture(); |
| } |
| |
| // if lease is enabled, check lease first |
| if (hasLease()) { |
| return CompletableFuture.completedFuture(readIndex); |
| } |
| |
| // send heartbeats and wait for majority acknowledgments |
| final AppendEntriesListener listener = readIndexHeartbeats.addAppendEntriesListener( |
| readIndex, i -> new AppendEntriesListener(i, senders)); |
| |
| // the readIndex is already acknowledged before |
| if (listener == null) { |
| return CompletableFuture.completedFuture(readIndex); |
| } |
| |
| return listener.getFuture(); |
| } |
| |
| @Override |
| public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesReplyProto reply) { |
| readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority); |
| } |
| |
| boolean getAndSetLeaseEnabled(boolean newValue) { |
| return lease.getAndSetEnabled(newValue); |
| } |
| |
| boolean hasLease() { |
| if (!lease.isEnabled()) { |
| return false; |
| } |
| |
| if (checkLeaderLease()) { |
| return true; |
| } |
| |
| // try extending the leader lease |
| final RaftConfigurationImpl conf = server.getRaftConf(); |
| final CurrentOldFollowerInfos info = followerInfoMap.getFollowerInfos(conf); |
| lease.extend(info.getCurrent(), info.getOld(), peers -> conf.hasMajority(peers, server.getId())); |
| |
| return checkLeaderLease(); |
| } |
| |
| private boolean checkLeaderLease() { |
| return isRunning() && isReady() |
| && (server.getRaftConf().isSingleton() || lease.isValid()); |
| } |
| |
| void replyPendingRequest(long logIndex, RaftClientReply reply) { |
| pendingRequests.replyPendingRequest(logIndex, reply); |
| } |
| |
| TransactionContext getTransactionContext(long index) { |
| return pendingRequests.getTransactionContext(index); |
| } |
| |
| long[] getFollowerNextIndices() { |
| return getLogAppenders().mapToLong(s -> s.getFollower().getNextIndex()).toArray(); |
| } |
| |
| static Map<RaftPeerId, RaftPeer> newMap(Collection<RaftPeer> peers, String str) { |
| Objects.requireNonNull(peers, () -> str + " == null"); |
| final Map<RaftPeerId, RaftPeer> map = new HashMap<>(); |
| for(RaftPeer p : peers) { |
| map.put(p.getId(), p); |
| } |
| return Collections.unmodifiableMap(map); |
| } |
| |
| private class ConfigurationStagingState { |
| private final String name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| private final Map<RaftPeerId, RaftPeer> newPeers; |
| private final Map<RaftPeerId, RaftPeer> newListeners; |
| private final PeerConfiguration newConf; |
| |
| ConfigurationStagingState(Collection<RaftPeer> newPeers, Collection<RaftPeer> newListeners, |
| PeerConfiguration newConf) { |
| this.newPeers = newMap(newPeers, "peer"); |
| this.newListeners = newMap(newListeners, "listeners"); |
| this.newConf = newConf; |
| } |
| |
| RaftConfigurationImpl generateOldNewConf(RaftConfigurationImpl current, long logIndex) { |
| return RaftConfigurationImpl.newBuilder() |
| .setConf(newConf) |
| .setOldConf(current) |
| .setLogEntryIndex(logIndex) |
| .build(); |
| } |
| |
| Collection<RaftPeer> getNewPeers() { |
| return newPeers.values(); |
| } |
| |
| Collection<RaftPeer> getNewListeners() { |
| return newListeners.values(); |
| } |
| |
| boolean contains(RaftPeerId peerId) { |
| return newPeers.containsKey(peerId) || newListeners.containsKey(peerId); |
| } |
| |
| void fail(BootStrapProgress progress) { |
| final String message = this + ": Fail to set configuration " + newConf + " due to " + progress; |
| LOG.debug(message); |
| stopAndRemoveSenders(s -> !isCaughtUp(s.getFollower())); |
| |
| stagingState = null; |
| // send back failure response to client's request |
| pendingRequests.failSetConfiguration(new ReconfigurationTimeoutException(message)); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| /** |
| * @return the RaftPeer (address and id) information of the followers. |
| */ |
| Stream<RaftPeer> getFollowers() { |
| return getLogAppenders() |
| .map(sender -> sender.getFollower().getPeer()) |
| .filter(peer -> server.getRaftConf().containsInConf(peer.getId())); |
| } |
| |
| Stream<LogAppender> getLogAppenders() { |
| return StreamSupport.stream(senders.spliterator(), false); |
| } |
| |
| Optional<LogAppender> getLogAppender(RaftPeerId id) { |
| return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny(); |
| } |
| |
| private static boolean isCaughtUp(FollowerInfo follower) { |
| return ((FollowerInfoImpl)follower).isCaughtUp(); |
| } |
| |
| @Override |
| public void checkHealth(FollowerInfo follower) { |
| final TimeDuration elapsedTime = follower.getLastRpcResponseTime().elapsedTime(); |
| if (elapsedTime.compareTo(server.properties().rpcSlownessTimeout()) > 0) { |
| final RoleInfoProto leaderInfo = server.getInfo().getRoleInfoProto(); |
| server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo); |
| server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo, follower.getPeer()); |
| } |
| final RaftPeerId followerId = follower.getId(); |
| raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS)); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |