| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.raft.jraft.core; |
| |
| import com.lmax.disruptor.EventHandler; |
| import com.lmax.disruptor.EventTranslator; |
| import com.lmax.disruptor.RingBuffer; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import org.apache.ignite.lang.IgniteLogger; |
| import org.apache.ignite.raft.client.Peer; |
| import org.apache.ignite.raft.jraft.Closure; |
| import org.apache.ignite.raft.jraft.FSMCaller; |
| import org.apache.ignite.raft.jraft.JRaftServiceFactory; |
| import org.apache.ignite.raft.jraft.Node; |
| import org.apache.ignite.raft.jraft.ReadOnlyService; |
| import org.apache.ignite.raft.jraft.ReplicatorGroup; |
| import org.apache.ignite.raft.jraft.Status; |
| import org.apache.ignite.raft.jraft.closure.CatchUpClosure; |
| import org.apache.ignite.raft.jraft.closure.ClosureQueue; |
| import org.apache.ignite.raft.jraft.closure.ClosureQueueImpl; |
| import org.apache.ignite.raft.jraft.closure.ReadIndexClosure; |
| import org.apache.ignite.raft.jraft.closure.SynchronizedClosure; |
| import org.apache.ignite.raft.jraft.conf.Configuration; |
| import org.apache.ignite.raft.jraft.conf.ConfigurationEntry; |
| import org.apache.ignite.raft.jraft.conf.ConfigurationManager; |
| import org.apache.ignite.raft.jraft.disruptor.GroupAware; |
| import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor; |
| import org.apache.ignite.raft.jraft.entity.Ballot; |
| import org.apache.ignite.raft.jraft.entity.EnumOutter; |
| import org.apache.ignite.raft.jraft.entity.LeaderChangeContext; |
| import org.apache.ignite.raft.jraft.entity.LogEntry; |
| import org.apache.ignite.raft.jraft.entity.LogId; |
| import org.apache.ignite.raft.jraft.entity.NodeId; |
| import org.apache.ignite.raft.jraft.entity.PeerId; |
| import org.apache.ignite.raft.jraft.entity.RaftOutter; |
| import org.apache.ignite.raft.jraft.entity.Task; |
| import org.apache.ignite.raft.jraft.entity.UserLog; |
| import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException; |
| import org.apache.ignite.raft.jraft.error.LogNotFoundException; |
| import org.apache.ignite.raft.jraft.error.RaftError; |
| import org.apache.ignite.raft.jraft.error.RaftException; |
| import org.apache.ignite.raft.jraft.option.BallotBoxOptions; |
| import org.apache.ignite.raft.jraft.option.BootstrapOptions; |
| import org.apache.ignite.raft.jraft.option.FSMCallerOptions; |
| import org.apache.ignite.raft.jraft.option.LogManagerOptions; |
| import org.apache.ignite.raft.jraft.option.NodeOptions; |
| import org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions; |
| import org.apache.ignite.raft.jraft.option.RaftOptions; |
| import org.apache.ignite.raft.jraft.option.ReadOnlyOption; |
| import org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions; |
| import org.apache.ignite.raft.jraft.option.ReplicatorGroupOptions; |
| import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions; |
| import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseBuilder; |
| import org.apache.ignite.raft.jraft.rpc.Message; |
| import org.apache.ignite.raft.jraft.rpc.RaftClientService; |
| import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory; |
| import org.apache.ignite.raft.jraft.rpc.RaftServerService; |
| import org.apache.ignite.raft.jraft.rpc.ReadIndexResponseBuilder; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.RequestVoteRequest; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.RequestVoteResponse; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowResponse; |
| import org.apache.ignite.raft.jraft.rpc.RpcResponseClosure; |
| import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter; |
| import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService; |
| import org.apache.ignite.raft.jraft.storage.LogManager; |
| import org.apache.ignite.raft.jraft.storage.LogStorage; |
| import org.apache.ignite.raft.jraft.storage.RaftMetaStorage; |
| import org.apache.ignite.raft.jraft.storage.SnapshotExecutor; |
| import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl; |
| import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl; |
| import org.apache.ignite.raft.jraft.util.ByteString; |
| import org.apache.ignite.raft.jraft.util.Describer; |
| import org.apache.ignite.raft.jraft.util.DisruptorMetricSet; |
| import org.apache.ignite.raft.jraft.util.OnlyForTest; |
| import org.apache.ignite.raft.jraft.util.RepeatedTimer; |
| import org.apache.ignite.raft.jraft.util.Requires; |
| import org.apache.ignite.raft.jraft.util.StringUtils; |
| import org.apache.ignite.raft.jraft.util.SystemPropertyUtil; |
| import org.apache.ignite.raft.jraft.util.ThreadHelper; |
| import org.apache.ignite.raft.jraft.util.ThreadId; |
| import org.apache.ignite.raft.jraft.util.Utils; |
| import org.apache.ignite.raft.jraft.util.concurrent.LongHeldDetectingReadWriteLock; |
| import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory; |
| |
| /** |
| * The raft replica node implementation. |
| */ |
| public class NodeImpl implements Node, RaftServerService { |
| private static final IgniteLogger LOG = IgniteLogger.forClass(NodeImpl.class); |
| |
| // Max retry times when applying tasks. |
| private static final int MAX_APPLY_RETRY_TIMES = 3; |
| |
| /** |
| * Internal states |
| */ |
| private final ReadWriteLock readWriteLock = new NodeReadWriteLock( |
| this); |
| protected final Lock writeLock = this.readWriteLock |
| .writeLock(); |
| protected final Lock readLock = this.readWriteLock |
| .readLock(); |
| private volatile State state; |
| private volatile CountDownLatch shutdownLatch; |
| private long currTerm; |
| private volatile long lastLeaderTimestamp; |
| private PeerId leaderId = new PeerId(); |
| private PeerId votedId; |
| private final Ballot voteCtx = new Ballot(); |
| private final Ballot prevVoteCtx = new Ballot(); |
| private ConfigurationEntry conf; |
| private StopTransferArg stopTransferArg; |
| /** |
| * Raft group and node options and identifier |
| */ |
| private final String groupId; |
| private NodeOptions options; |
| private RaftOptions raftOptions; |
| private final PeerId serverId; |
| |
| private Peer peer; |
| |
| /** |
| * Other services |
| */ |
| private final ConfigurationCtx confCtx; |
| private LogStorage logStorage; |
| private RaftMetaStorage metaStorage; |
| private ClosureQueue closureQueue; |
| private ConfigurationManager configManager; |
| private LogManager logManager; |
| private FSMCaller fsmCaller; |
| private BallotBox ballotBox; |
| private SnapshotExecutor snapshotExecutor; |
| private ReplicatorGroup replicatorGroup; |
| private final List<Closure> shutdownContinuations = new ArrayList<>(); |
| private RaftClientService rpcClientService; |
| private ReadOnlyService readOnlyService; |
| /** |
| * Timers |
| */ |
| private Scheduler timerManager; |
| private RepeatedTimer electionTimer; |
| private RepeatedTimer voteTimer; |
| private RepeatedTimer stepDownTimer; |
| |
| /** |
| * Triggered on a leader each electionTimeoutMs / 2 milliseconds to ensure the alive quorum. |
| */ |
| private RepeatedTimer snapshotTimer; |
| private ScheduledFuture<?> transferTimer; |
| private ThreadId wakingCandidate; |
| /** |
| * Disruptor to run node service |
| */ |
| private StripedDisruptor<LogEntryAndClosure> applyDisruptor; |
| private RingBuffer<LogEntryAndClosure> applyQueue; |
| |
| /** |
| * Metrics |
| */ |
| private NodeMetrics metrics; |
| |
| private NodeId nodeId; |
| private JRaftServiceFactory serviceFactory; |
| |
| /** |
| * ReplicatorStateListeners |
| */ |
| private final CopyOnWriteArrayList<Replicator.ReplicatorStateListener> replicatorStateListeners = |
| new CopyOnWriteArrayList<>(); |
| /** |
| * Node's target leader election priority value |
| */ |
| private volatile int targetPriority; |
| /** |
| * The number of elections time out for current node |
| */ |
| private volatile int electionTimeoutCounter; |
| |
| /** |
| * Timer factory. |
| */ |
| private RaftTimerFactory timerFactory; |
| |
| private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock { |
| static final long MAX_BLOCKING_MS_TO_REPORT = SystemPropertyUtil.getLong( |
| "jraft.node.detecting.lock.max_blocking_ms_to_report", -1); |
| |
| private final Node node; |
| |
| NodeReadWriteLock(final Node node) { |
| super(MAX_BLOCKING_MS_TO_REPORT, TimeUnit.MILLISECONDS); |
| this.node = node; |
| } |
| |
| @Override |
| public void report(final AcquireMode acquireMode, final Thread heldThread, |
| final Collection<Thread> queuedThreads, final long blockedNanos) { |
| final long blockedMs = TimeUnit.NANOSECONDS.toMillis(blockedNanos); |
| LOG.warn( |
| "Raft-Node-Lock report: currentThread={}, acquireMode={}, heldThread={}, queuedThreads={}, blockedMs={}.", |
| Thread.currentThread(), acquireMode, heldThread, queuedThreads, blockedMs); |
| |
| final NodeMetrics metrics = this.node.getNodeMetrics(); |
| if (metrics != null) { |
| metrics.recordLatency("node-lock-blocked", blockedMs); |
| } |
| } |
| } |
| |
| /** |
| * Node service event. |
| */ |
| public static class LogEntryAndClosure implements GroupAware { |
| /** Raft group id. */ |
| String groupId; |
| |
| LogEntry entry; |
| Closure done; |
| long expectedTerm; |
| CountDownLatch shutdownLatch; |
| |
| /** {@inheritDoc} */ |
| @Override public String groupId() { |
| return groupId; |
| } |
| |
| public void reset() { |
| this.groupId = null; |
| this.entry = null; |
| this.done = null; |
| this.expectedTerm = 0; |
| this.shutdownLatch = null; |
| } |
| } |
| |
| /** |
| * Event handler. |
| */ |
| private class LogEntryAndClosureHandler implements EventHandler<LogEntryAndClosure> { |
| // task list for batch |
| private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch()); |
| |
| @Override |
| public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch) throws Exception { |
| if (event.shutdownLatch != null) { |
| if (!this.tasks.isEmpty()) { |
| executeApplyingTasks(this.tasks); |
| this.tasks.clear(); |
| } |
| event.shutdownLatch.countDown(); |
| return; |
| } |
| |
| this.tasks.add(event); |
| if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) { |
| executeApplyingTasks(this.tasks); |
| this.tasks.clear(); |
| } |
| } |
| } |
| |
| /** |
| * Configuration commit context. |
| */ |
| private static class ConfigurationCtx { |
| enum Stage { |
| STAGE_NONE, // none stage |
| STAGE_CATCHING_UP, // the node is catching-up |
| STAGE_JOINT, // joint stage |
| STAGE_STABLE // stable stage |
| } |
| |
| final NodeImpl node; |
| Stage stage; |
| // Peers change times |
| int nchanges; |
| long version; |
| // peers |
| List<PeerId> newPeers = new ArrayList<>(); |
| List<PeerId> oldPeers = new ArrayList<>(); |
| List<PeerId> addingPeers = new ArrayList<>(); |
| // learners |
| List<PeerId> newLearners = new ArrayList<>(); |
| List<PeerId> oldLearners = new ArrayList<>(); |
| Closure done; |
| |
| ConfigurationCtx(final NodeImpl node) { |
| super(); |
| this.node = node; |
| this.stage = Stage.STAGE_NONE; |
| this.version = 0; |
| this.done = null; |
| } |
| |
| /** |
| * Start change configuration. |
| */ |
| void start(final Configuration oldConf, final Configuration newConf, final Closure done) { |
| if (isBusy()) { |
| if (done != null) { |
| Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), done, new Status(RaftError.EBUSY, "Already in busy stage.")); |
| } |
| throw new IllegalStateException("Busy stage"); |
| } |
| if (this.done != null) { |
| if (done != null) { |
| Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), done, new Status(RaftError.EINVAL, "Already have done closure.")); |
| } |
| throw new IllegalArgumentException("Already have done closure"); |
| } |
| this.done = done; |
| this.stage = Stage.STAGE_CATCHING_UP; |
| this.oldPeers = oldConf.listPeers(); |
| this.newPeers = newConf.listPeers(); |
| this.oldLearners = oldConf.listLearners(); |
| this.newLearners = newConf.listLearners(); |
| final Configuration adding = new Configuration(); |
| final Configuration removing = new Configuration(); |
| newConf.diff(oldConf, adding, removing); |
| this.nchanges = adding.size() + removing.size(); |
| |
| addNewLearners(); |
| if (adding.isEmpty()) { |
| nextStage(); |
| return; |
| } |
| addNewPeers(adding); |
| } |
| |
| private void addNewPeers(final Configuration adding) { |
| this.addingPeers = adding.listPeers(); |
| LOG.info("Adding peers: {}.", this.addingPeers); |
| for (final PeerId newPeer : this.addingPeers) { |
| if (!this.node.replicatorGroup.addReplicator(newPeer)) { |
| LOG.error("Node {} start the replicator failed, peer={}.", this.node.getNodeId(), newPeer); |
| onCaughtUp(this.version, newPeer, false); |
| return; |
| } |
| final OnCaughtUp caughtUp = new OnCaughtUp(this.node, this.node.currTerm, newPeer, this.version); |
| final long dueTime = Utils.nowMs() + this.node.options.getElectionTimeoutMs(); |
| if (!this.node.replicatorGroup.waitCaughtUp(newPeer, this.node.options.getCatchupMargin(), dueTime, |
| caughtUp)) { |
| LOG.error("Node {} waitCaughtUp, peer={}.", this.node.getNodeId(), newPeer); |
| onCaughtUp(this.version, newPeer, false); |
| return; |
| } |
| } |
| } |
| |
| private void addNewLearners() { |
| final Set<PeerId> addingLearners = new HashSet<>(this.newLearners); |
| addingLearners.removeAll(this.oldLearners); |
| LOG.info("Adding learners: {}.", this.addingPeers); |
| for (final PeerId newLearner : addingLearners) { |
| if (!this.node.replicatorGroup.addReplicator(newLearner, ReplicatorType.Learner)) { |
| LOG.error("Node {} start the learner replicator failed, peer={}.", this.node.getNodeId(), |
| newLearner); |
| } |
| } |
| } |
| |
| void onCaughtUp(final long version, final PeerId peer, final boolean success) { |
| if (version != this.version) { |
| LOG.warn("Ignore onCaughtUp message, mismatch configuration context version, expect {}, but is {}.", |
| this.version, version); |
| return; |
| } |
| Requires.requireTrue(this.stage == Stage.STAGE_CATCHING_UP, "Stage is not in STAGE_CATCHING_UP"); |
| if (success) { |
| this.addingPeers.remove(peer); |
| if (this.addingPeers.isEmpty()) { |
| nextStage(); |
| return; |
| } |
| return; |
| } |
| LOG.warn("Node {} fail to catch up peer {} when trying to change peers from {} to {}.", |
| this.node.getNodeId(), peer, this.oldPeers, this.newPeers); |
| reset(new Status(RaftError.ECATCHUP, "Peer %s failed to catch up.", peer)); |
| } |
| |
| void reset() { |
| reset(null); |
| } |
| |
| void reset(final Status st) { |
| if (st != null && st.isOk()) { |
| this.node.stopReplicator(this.newPeers, this.oldPeers); |
| this.node.stopReplicator(this.newLearners, this.oldLearners); |
| } |
| else { |
| this.node.stopReplicator(this.oldPeers, this.newPeers); |
| this.node.stopReplicator(this.oldLearners, this.newLearners); |
| } |
| clearPeers(); |
| clearLearners(); |
| |
| this.version++; |
| this.stage = Stage.STAGE_NONE; |
| this.nchanges = 0; |
| if (this.done != null) { |
| Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), this.done, st != null ? st : |
| new Status(RaftError.EPERM, "Leader stepped down.")); |
| this.done = null; |
| } |
| } |
| |
| private void clearLearners() { |
| this.newLearners.clear(); |
| this.oldLearners.clear(); |
| } |
| |
| private void clearPeers() { |
| this.newPeers.clear(); |
| this.oldPeers.clear(); |
| this.addingPeers.clear(); |
| } |
| |
| /** |
| * Invoked when this node becomes the leader, write a configuration change log as the first log. |
| */ |
| void flush(final Configuration conf, final Configuration oldConf) { |
| Requires.requireTrue(!isBusy(), "Flush when busy"); |
| this.newPeers = conf.listPeers(); |
| this.newLearners = conf.listLearners(); |
| if (oldConf == null || oldConf.isEmpty()) { |
| this.stage = Stage.STAGE_STABLE; |
| this.oldPeers = this.newPeers; |
| this.oldLearners = this.newLearners; |
| } |
| else { |
| this.stage = Stage.STAGE_JOINT; |
| this.oldPeers = oldConf.listPeers(); |
| this.oldLearners = oldConf.listLearners(); |
| } |
| this.node.unsafeApplyConfiguration(conf, oldConf == null || oldConf.isEmpty() ? null : oldConf, true); |
| } |
| |
| void nextStage() { |
| Requires.requireTrue(isBusy(), "Not in busy stage"); |
| switch (this.stage) { |
| case STAGE_CATCHING_UP: |
| if (this.nchanges > 0) { |
| this.stage = Stage.STAGE_JOINT; |
| this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), |
| new Configuration(this.oldPeers), false); |
| return; |
| } |
| // fallthrough. |
| case STAGE_JOINT: |
| this.stage = Stage.STAGE_STABLE; |
| this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), null, false); |
| break; |
| case STAGE_STABLE: |
| final boolean shouldStepDown = !this.newPeers.contains(this.node.serverId); |
| reset(new Status()); |
| if (shouldStepDown) { |
| this.node.stepDown(this.node.currTerm, true, new Status(RaftError.ELEADERREMOVED, |
| "This node was removed.")); |
| } |
| break; |
| case STAGE_NONE: |
| // noinspection ConstantConditions |
| Requires.requireTrue(false, "Can't reach here"); |
| break; |
| } |
| } |
| |
| boolean isBusy() { |
| return this.stage != Stage.STAGE_NONE; |
| } |
| } |
| |
| public NodeImpl(final String groupId, final PeerId serverId) { |
| super(); |
| if (groupId != null) { |
| Utils.verifyGroupId(groupId); |
| } |
| this.groupId = groupId; |
| this.serverId = serverId != null ? serverId.copy() : null; |
| this.state = State.STATE_UNINITIALIZED; |
| this.currTerm = 0; |
| updateLastLeaderTimestamp(Utils.monotonicMs()); |
| this.confCtx = new ConfigurationCtx(this); |
| this.wakingCandidate = null; |
| } |
| |
| private boolean initSnapshotStorage() { |
| if (StringUtils.isEmpty(this.options.getSnapshotUri())) { |
| LOG.warn("Do not set snapshot uri, ignore initSnapshotStorage."); |
| return true; |
| } |
| this.snapshotExecutor = new SnapshotExecutorImpl(); |
| final SnapshotExecutorOptions opts = new SnapshotExecutorOptions(); |
| opts.setUri(this.options.getSnapshotUri()); |
| opts.setFsmCaller(this.fsmCaller); |
| opts.setNode(this); |
| opts.setLogManager(this.logManager); |
| opts.setAddr(this.serverId != null ? this.serverId.getEndpoint() : null); |
| opts.setInitTerm(this.currTerm); |
| opts.setFilterBeforeCopyRemote(this.options.isFilterBeforeCopyRemote()); |
| // get snapshot throttle |
| opts.setSnapshotThrottle(this.options.getSnapshotThrottle()); |
| return this.snapshotExecutor.init(opts); |
| } |
| |
| private boolean initLogStorage() { |
| Requires.requireNonNull(this.fsmCaller, "Null fsm caller"); |
| this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions); |
| this.logManager = new LogManagerImpl(); |
| final LogManagerOptions opts = new LogManagerOptions(); |
| opts.setGroupId(groupId); |
| opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory()); |
| opts.setLogStorage(this.logStorage); |
| opts.setConfigurationManager(this.configManager); |
| opts.setNode(this); |
| opts.setFsmCaller(this.fsmCaller); |
| opts.setNodeMetrics(this.metrics); |
| opts.setRaftOptions(this.raftOptions); |
| opts.setLogManagerDisruptor(options.getLogManagerDisruptor()); |
| |
| return this.logManager.init(opts); |
| } |
| |
| private boolean initMetaStorage() { |
| this.metaStorage = this.serviceFactory.createRaftMetaStorage(this.options.getRaftMetaUri(), this.raftOptions); |
| RaftMetaStorageOptions opts = new RaftMetaStorageOptions(); |
| opts.setNode(this); |
| if (!this.metaStorage.init(opts)) { |
| LOG.error("Node {} init meta storage failed, uri={}.", this.serverId, this.options.getRaftMetaUri()); |
| return false; |
| } |
| this.currTerm = this.metaStorage.getTerm(); |
| this.votedId = this.metaStorage.getVotedFor().copy(); |
| return true; |
| } |
| |
| private void handleSnapshotTimeout() { |
| this.writeLock.lock(); |
| try { |
| if (!this.state.isActive()) { |
| return; |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| // do_snapshot in another thread to avoid blocking the timer thread. |
| Utils.runInThread(this.getOptions().getCommonExecutor(), () -> doSnapshot(null)); |
| } |
| |
| private void handleElectionTimeout() { |
| boolean doUnlock = true; |
| this.writeLock.lock(); |
| try { |
| if (this.state != State.STATE_FOLLOWER) { |
| return; |
| } |
| if (isCurrentLeaderValid()) { |
| return; |
| } |
| resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", |
| this.leaderId)); |
| |
| // Judge whether to launch a election. |
| if (!allowLaunchElection()) { |
| return; |
| } |
| |
| doUnlock = false; |
| preVote(); |
| |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| } |
| } |
| |
| /** |
| * Whether to allow for launching election or not by comparing node's priority with target priority. And at the same |
| * time, if next leader is not elected until next election timeout, it decays its local target priority |
| * exponentially. |
| * |
| * @return Whether current node will launch election or not. |
| */ |
| @SuppressWarnings("NonAtomicOperationOnVolatileField") |
| private boolean allowLaunchElection() { |
| |
| // Priority 0 is a special value so that a node will never participate in election. |
| if (this.serverId.isPriorityNotElected()) { |
| LOG.warn("Node {} will never participate in election, because it's priority={}.", getNodeId(), |
| this.serverId.getPriority()); |
| return false; |
| } |
| |
| // If this nodes disable priority election, then it can make a election. |
| if (this.serverId.isPriorityDisabled()) { |
| return true; |
| } |
| |
| // If current node's priority < target_priority, it does not initiate leader, |
| // election and waits for the next election timeout. |
| if (this.serverId.getPriority() < this.targetPriority) { |
| this.electionTimeoutCounter++; |
| |
| // If next leader is not elected until next election timeout, it |
| // decays its local target priority exponentially. |
| if (this.electionTimeoutCounter > 1) { |
| decayTargetPriority(); |
| this.electionTimeoutCounter = 0; |
| } |
| |
| if (this.electionTimeoutCounter == 1) { |
| LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.", |
| getNodeId()); |
| return false; |
| } |
| } |
| |
| return this.serverId.getPriority() >= this.targetPriority; |
| } |
| |
| /** |
| * Decay targetPriority value based on gap value. |
| */ |
| @SuppressWarnings("NonAtomicOperationOnVolatileField") |
| private void decayTargetPriority() { |
| // Default Gap value should be bigger than 10. |
| final int decayPriorityGap = Math.max(this.options.getDecayPriorityGap(), 10); |
| final int gap = Math.max(decayPriorityGap, (this.targetPriority / 5)); |
| |
| final int prevTargetPriority = this.targetPriority; |
| this.targetPriority = Math.max(ElectionPriority.MinValue, (this.targetPriority - gap)); |
| LOG.info("Node {} priority decay, from: {}, to: {}.", getNodeId(), prevTargetPriority, this.targetPriority); |
| } |
| |
| /** |
| * Check and set configuration for node.At the same time, if configuration is changed, then compute and update the |
| * target priority value. |
| * |
| * @param inLock whether the writeLock has already been locked in other place. |
| */ |
| private void checkAndSetConfiguration(final boolean inLock) { |
| if (!inLock) { |
| this.writeLock.lock(); |
| } |
| try { |
| final ConfigurationEntry prevConf = this.conf; |
| this.conf = this.logManager.checkAndSetConfiguration(prevConf); |
| |
| if (this.conf != prevConf) { |
| // Update target priority value |
| final int prevTargetPriority = this.targetPriority; |
| this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers()); |
| if (prevTargetPriority != this.targetPriority) { |
| LOG.info("Node {} target priority value has changed from: {}, to: {}.", getNodeId(), |
| prevTargetPriority, this.targetPriority); |
| } |
| this.electionTimeoutCounter = 0; |
| } |
| } |
| finally { |
| if (!inLock) { |
| this.writeLock.unlock(); |
| } |
| } |
| } |
| |
| /** |
| * Get max priority value for all nodes in the same Raft group, and update current node's target priority value. |
| * |
| * @param peerIds peer nodes in the same Raft group |
| */ |
| private int getMaxPriorityOfNodes(final List<PeerId> peerIds) { |
| Requires.requireNonNull(peerIds, "Null peer list"); |
| |
| int maxPriority = Integer.MIN_VALUE; |
| for (final PeerId peerId : peerIds) { |
| final int priorityVal = peerId.getPriority(); |
| maxPriority = Math.max(priorityVal, maxPriority); |
| } |
| |
| return maxPriority; |
| } |
| |
| private boolean initFSMCaller(final LogId bootstrapId) { |
| if (this.fsmCaller == null) { |
| LOG.error("Fail to init fsm caller, null instance, bootstrapId={}.", bootstrapId); |
| return false; |
| } |
| this.closureQueue = new ClosureQueueImpl(this.getOptions()); |
| final FSMCallerOptions opts = new FSMCallerOptions(); |
| opts.setAfterShutdown(status -> afterShutdown()); |
| opts.setLogManager(this.logManager); |
| opts.setFsm(this.options.getFsm()); |
| opts.setClosureQueue(this.closureQueue); |
| opts.setNode(this); |
| opts.setBootstrapId(bootstrapId); |
| opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory()); |
| opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor()); |
| opts.setGroupId(groupId); |
| |
| return this.fsmCaller.init(opts); |
| } |
| |
| private static class BootstrapStableClosure extends LogManager.StableClosure { |
| |
| private final SynchronizedClosure done = new SynchronizedClosure(); |
| |
| BootstrapStableClosure() { |
| super(null); |
| } |
| |
| public Status await() throws InterruptedException { |
| return this.done.await(); |
| } |
| |
| @Override |
| public void run(final Status status) { |
| this.done.run(status); |
| } |
| } |
| |
| public boolean bootstrap(final BootstrapOptions opts) throws InterruptedException { |
| if (opts.getLastLogIndex() > 0 && (opts.getGroupConf().isEmpty() || opts.getFsm() == null)) { |
| LOG.error("Invalid arguments for bootstrap, groupConf={}, fsm={}, lastLogIndex={}.", opts.getGroupConf(), |
| opts.getFsm(), opts.getLastLogIndex()); |
| return false; |
| } |
| if (opts.getGroupConf().isEmpty()) { |
| LOG.error("Bootstrapping an empty node makes no sense."); |
| return false; |
| } |
| Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory"); |
| this.serviceFactory = opts.getServiceFactory(); |
| // Term is not an option since changing it is very dangerous |
| final long bootstrapLogTerm = opts.getLastLogIndex() > 0 ? 1 : 0; |
| final LogId bootstrapId = new LogId(opts.getLastLogIndex(), bootstrapLogTerm); |
| this.options = opts.getNodeOptions() == null ? new NodeOptions() : opts.getNodeOptions(); |
| this.raftOptions = this.options.getRaftOptions(); |
| this.metrics = new NodeMetrics(opts.isEnableMetrics()); |
| this.options.setFsm(opts.getFsm()); |
| this.options.setLogUri(opts.getLogUri()); |
| this.options.setRaftMetaUri(opts.getRaftMetaUri()); |
| this.options.setSnapshotUri(opts.getSnapshotUri()); |
| |
| this.configManager = new ConfigurationManager(); |
| // Create fsmCaller at first as logManager needs it to report error |
| this.fsmCaller = new FSMCallerImpl(); |
| |
| if (!initLogStorage()) { |
| LOG.error("Fail to init log storage."); |
| return false; |
| } |
| if (!initMetaStorage()) { |
| LOG.error("Fail to init meta storage."); |
| return false; |
| } |
| if (this.currTerm == 0) { |
| this.currTerm = 1; |
| if (!this.metaStorage.setTermAndVotedFor(1, new PeerId())) { |
| LOG.error("Fail to set term."); |
| return false; |
| } |
| } |
| |
| if (opts.getFsm() != null && !initFSMCaller(bootstrapId)) { |
| LOG.error("Fail to init fsm caller."); |
| return false; |
| } |
| |
| final LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); |
| entry.getId().setTerm(this.currTerm); |
| entry.setPeers(opts.getGroupConf().listPeers()); |
| entry.setLearners(opts.getGroupConf().listLearners()); |
| |
| final List<LogEntry> entries = new ArrayList<>(); |
| entries.add(entry); |
| |
| final BootstrapStableClosure bootstrapDone = new BootstrapStableClosure(); |
| this.logManager.appendEntries(entries, bootstrapDone); |
| if (!bootstrapDone.await().isOk()) { |
| LOG.error("Fail to append configuration."); |
| return false; |
| } |
| |
| if (opts.getLastLogIndex() > 0) { |
| if (!initSnapshotStorage()) { |
| LOG.error("Fail to init snapshot storage."); |
| return false; |
| } |
| final SynchronizedClosure snapshotDone = new SynchronizedClosure(); |
| this.snapshotExecutor.doSnapshot(snapshotDone); |
| if (!snapshotDone.await().isOk()) { |
| LOG.error("Fail to save snapshot, status={}.", snapshotDone.getStatus()); |
| return false; |
| } |
| } |
| |
| if (this.logManager.getFirstLogIndex() != opts.getLastLogIndex() + 1) { |
| throw new IllegalStateException("First and last log index mismatch"); |
| } |
| if (opts.getLastLogIndex() > 0) { |
| if (this.logManager.getLastLogIndex() != opts.getLastLogIndex()) { |
| throw new IllegalStateException("Last log index mismatch"); |
| } |
| } |
| else { |
| if (this.logManager.getLastLogIndex() != opts.getLastLogIndex() + 1) { |
| throw new IllegalStateException("Last log index mismatch"); |
| } |
| } |
| |
| return true; |
| } |
| |
| private int heartbeatTimeout(final int electionTimeout) { |
| return Math.max(electionTimeout / this.raftOptions.getElectionHeartbeatFactor(), 10); |
| } |
| |
| private int randomTimeout(final int timeoutMs) { |
| return ThreadLocalRandom.current().nextInt(timeoutMs, timeoutMs + this.raftOptions.getMaxElectionDelayMs()); |
| } |
| |
| @Override |
| public boolean init(final NodeOptions opts) { |
| Requires.requireNonNull(opts, "Null node options"); |
| Requires.requireNonNull(opts.getRaftOptions(), "Null raft options"); |
| Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory"); |
| this.serviceFactory = opts.getServiceFactory(); |
| this.options = opts; |
| this.raftOptions = opts.getRaftOptions(); |
| this.metrics = new NodeMetrics(opts.isEnableMetrics()); |
| this.serverId.setPriority(opts.getElectionPriority()); |
| this.electionTimeoutCounter = 0; |
| this.timerFactory = opts.getServiceFactory().createRaftTimerFactory(); |
| if (opts.getReplicationStateListeners() != null) |
| this.replicatorStateListeners.addAll(opts.getReplicationStateListeners()); |
| |
| if (this.serverId.getIp().equals(Utils.IP_ANY)) { |
| LOG.error("Node can't start from IP_ANY."); |
| return false; |
| } |
| |
| // Init timers |
| final String suffix = getOptions().getServerName() + "-"; |
| |
| timerManager = getOptions().getScheduler() == null ? timerFactory.createScheduler(this.options.getTimerPoolSize(), |
| "JRaft-Node-ScheduleThreadPool-" + suffix) : getOptions().getScheduler(); |
| |
| String name = "JRaft-VoteTimer-" + suffix; |
| this.voteTimer = new RepeatedTimer(name, options.getElectionTimeoutMs(), timerFactory.getVoteTimer(name)) { |
| @Override |
| protected void onTrigger() { |
| handleVoteTimeout(); |
| } |
| |
| @Override |
| protected int adjustTimeout(final int timeoutMs) { |
| return randomTimeout(timeoutMs); |
| } |
| }; |
| |
| name = "JRaft-ElectionTimer-" + suffix; |
| electionTimer = new RepeatedTimer(name, options.getElectionTimeoutMs(), timerFactory.getElectionTimer(name)) { |
| @Override |
| protected void onTrigger() { |
| handleElectionTimeout(); |
| } |
| |
| @Override |
| protected int adjustTimeout(final int timeoutMs) { |
| return randomTimeout(timeoutMs); |
| } |
| }; |
| |
| name = "JRaft-StepDownTimer-" + suffix; |
| stepDownTimer = new RepeatedTimer(name, options.getElectionTimeoutMs() >> 1, timerFactory.getStepDownTimer(name)) { |
| @Override |
| protected void onTrigger() { |
| handleStepDownTimeout(); |
| } |
| }; |
| |
| name = "JRaft-SnapshotTimer-" + suffix; |
| snapshotTimer = new RepeatedTimer(name, options.getSnapshotIntervalSecs() * 1000, timerFactory.getSnapshotTimer(name)) { |
| private volatile boolean firstSchedule = true; |
| |
| @Override |
| protected void onTrigger() { |
| handleSnapshotTimeout(); |
| } |
| |
| @Override |
| protected int adjustTimeout(final int timeoutMs) { |
| if (!this.firstSchedule) { |
| return timeoutMs; |
| } |
| |
| // Randomize the first snapshot trigger timeout |
| this.firstSchedule = false; |
| if (timeoutMs > 0) { |
| int half = timeoutMs / 2; |
| return half + ThreadLocalRandom.current().nextInt(half); |
| } |
| else { |
| return timeoutMs; |
| } |
| } |
| }; |
| |
| this.configManager = new ConfigurationManager(); |
| |
| applyDisruptor = opts.getNodeApplyDisruptor(); |
| |
| applyQueue = applyDisruptor.subscribe(groupId, new LogEntryAndClosureHandler()); |
| |
| if (this.metrics.getMetricRegistry() != null) { |
| this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor", |
| new DisruptorMetricSet(this.applyQueue)); |
| } |
| |
| this.fsmCaller = new FSMCallerImpl(); |
| if (!initLogStorage()) { |
| LOG.error("Node {} initLogStorage failed.", getNodeId()); |
| return false; |
| } |
| if (!initMetaStorage()) { |
| LOG.error("Node {} initMetaStorage failed.", getNodeId()); |
| return false; |
| } |
| if (!initFSMCaller(new LogId(0, 0))) { |
| LOG.error("Node {} initFSMCaller failed.", getNodeId()); |
| return false; |
| } |
| this.ballotBox = new BallotBox(); |
| final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions(); |
| ballotBoxOpts.setWaiter(this.fsmCaller); |
| ballotBoxOpts.setClosureQueue(this.closureQueue); |
| if (!this.ballotBox.init(ballotBoxOpts)) { |
| LOG.error("Node {} init ballotBox failed.", getNodeId()); |
| return false; |
| } |
| |
| if (!initSnapshotStorage()) { |
| LOG.error("Node {} initSnapshotStorage failed.", getNodeId()); |
| return false; |
| } |
| |
| final Status st = this.logManager.checkConsistency(); |
| if (!st.isOk()) { |
| LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st); |
| return false; |
| } |
| this.conf = new ConfigurationEntry(); |
| this.conf.setId(new LogId()); |
| // if have log using conf in log, else using conf in options |
| if (this.logManager.getLastLogIndex() > 0) { |
| checkAndSetConfiguration(false); |
| } |
| else { |
| this.conf.setConf(this.options.getInitialConf()); |
| // initially set to max(priority of all nodes) |
| this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers()); |
| } |
| |
| if (!this.conf.isEmpty()) { |
| Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf); |
| } |
| else { |
| LOG.info("Init node {} with empty conf.", this.serverId); |
| } |
| |
| this.replicatorGroup = new ReplicatorGroupImpl(); |
| this.rpcClientService = new DefaultRaftClientService(); |
| final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions(); |
| rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs())); |
| rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs()); |
| rgOpts.setLogManager(this.logManager); |
| rgOpts.setBallotBox(this.ballotBox); |
| rgOpts.setNode(this); |
| rgOpts.setRaftRpcClientService(this.rpcClientService); |
| rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null); |
| rgOpts.setRaftOptions(this.raftOptions); |
| rgOpts.setTimerManager(this.timerManager); |
| |
| // Adds metric registry to RPC service. |
| this.options.setMetricRegistry(this.metrics.getMetricRegistry()); |
| |
| if (!this.rpcClientService.init(this.options)) { |
| LOG.error("Fail to init rpc service."); |
| return false; |
| } |
| this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts); |
| |
| this.readOnlyService = new ReadOnlyServiceImpl(); |
| final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions(); |
| rosOpts.setGroupId(groupId); |
| rosOpts.setFsmCaller(this.fsmCaller); |
| rosOpts.setNode(this); |
| rosOpts.setRaftOptions(this.raftOptions); |
| rosOpts.setReadOnlyServiceDisruptor(opts.getReadOnlyServiceDisruptor()); |
| |
| if (!this.readOnlyService.init(rosOpts)) { |
| LOG.error("Fail to init readOnlyService."); |
| return false; |
| } |
| |
| // set state to follower |
| this.state = State.STATE_FOLLOWER; |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm, |
| this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf()); |
| } |
| |
| if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) { |
| LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm); |
| this.snapshotTimer.start(); |
| } |
| |
| if (!this.conf.isEmpty()) { |
| stepDown(this.currTerm, false, new Status()); |
| } |
| |
| // Now the raft node is started , have to acquire the writeLock to avoid race |
| // conditions |
| this.writeLock.lock(); |
| if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) { |
| // The group contains only this server which must be the LEADER, trigger |
| // the timer immediately. |
| electSelf(); |
| } |
| else { |
| this.writeLock.unlock(); |
| } |
| |
| return true; |
| } |
| |
| @OnlyForTest |
| void tryElectSelf() { |
| this.writeLock.lock(); |
| // unlock in electSelf |
| electSelf(); |
| } |
| |
| // should be in writeLock |
| private void electSelf() { |
| long oldTerm; |
| try { |
| LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm); |
| if (!this.conf.contains(this.serverId)) { |
| LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf); |
| return; |
| } |
| if (this.state == State.STATE_FOLLOWER) { |
| LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm); |
| this.electionTimer.stop(); |
| } |
| resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, |
| "A follower's leader_id is reset to NULL as it begins to request_vote.")); |
| this.state = State.STATE_CANDIDATE; |
| this.currTerm++; |
| this.votedId = this.serverId.copy(); |
| LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm); |
| this.voteTimer.start(); |
| this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); |
| oldTerm = this.currTerm; |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| |
| final LogId lastLogId = this.logManager.getLastLogId(true); |
| |
| this.writeLock.lock(); |
| try { |
| // vote need defense ABA after unlock&writeLock |
| if (oldTerm != this.currTerm) { |
| LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm); |
| return; |
| } |
| for (final PeerId peer : this.conf.listPeers()) { |
| if (peer.equals(this.serverId)) { |
| continue; |
| } |
| if (!this.rpcClientService.connect(peer.getEndpoint())) { |
| LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); |
| continue; |
| } |
| final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this); |
| done.request = raftOptions.getRaftMessagesFactory() |
| .requestVoteRequest() |
| .preVote(false) // It's not a pre-vote request. |
| .groupId(this.groupId) |
| .serverId(this.serverId.toString()) |
| .peerId(peer.toString()) |
| .term(this.currTerm) |
| .lastLogIndex(lastLogId.getIndex()) |
| .lastLogTerm(lastLogId.getTerm()) |
| .build(); |
| this.rpcClientService.requestVote(peer.getEndpoint(), done.request, done); |
| } |
| |
| this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId); |
| this.voteCtx.grant(this.serverId); |
| if (this.voteCtx.isGranted()) { |
| becomeLeader(); |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| private void resetLeaderId(final PeerId newLeaderId, final Status status) { |
| if (newLeaderId.isEmpty()) { |
| if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) { |
| this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status)); |
| } |
| this.leaderId = PeerId.emptyPeer(); |
| } |
| else { |
| if (this.leaderId == null || this.leaderId.isEmpty()) { |
| this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status)); |
| } |
| this.leaderId = newLeaderId.copy(); |
| } |
| } |
| |
| // in writeLock |
| private void checkStepDown(final long requestTerm, final PeerId serverId) { |
| final Status status = new Status(); |
| if (requestTerm > this.currTerm) { |
| status.setError(RaftError.ENEWLEADER, "Raft node receives message from new leader with higher term."); |
| stepDown(requestTerm, false, status); |
| } |
| else if (this.state != State.STATE_FOLLOWER) { |
| status.setError(RaftError.ENEWLEADER, "Candidate receives message from new leader with the same term."); |
| stepDown(requestTerm, false, status); |
| } |
| else if (this.leaderId.isEmpty()) { |
| status.setError(RaftError.ENEWLEADER, "Follower receives message from new leader with the same term."); |
| stepDown(requestTerm, false, status); |
| } |
| // save current leader |
| if (this.leaderId == null || this.leaderId.isEmpty()) { |
| resetLeaderId(serverId, status); |
| } |
| } |
| |
| private void becomeLeader() { |
| Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state); |
| LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm, |
| this.conf.getConf(), this.conf.getOldConf()); |
| // cancel candidate vote timer |
| stopVoteTimer(); |
| this.state = State.STATE_LEADER; |
| this.leaderId = this.serverId.copy(); |
| this.replicatorGroup.resetTerm(this.currTerm); |
| // Start follower's replicators |
| for (final PeerId peer : this.conf.listPeers()) { |
| if (peer.equals(this.serverId)) { |
| continue; |
| } |
| LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer); |
| if (!this.replicatorGroup.addReplicator(peer)) { |
| LOG.error("Fail to add a replicator, peer={}.", peer); |
| } |
| } |
| |
| // Start learner's replicators |
| for (final PeerId peer : this.conf.listLearners()) { |
| LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer); |
| if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) { |
| LOG.error("Fail to add a learner replicator, peer={}.", peer); |
| } |
| } |
| |
| // init commit manager |
| this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1); |
| // Register _conf_ctx to reject configuration changing before the first log |
| // is committed. |
| if (this.confCtx.isBusy()) { |
| throw new IllegalStateException(); |
| } |
| this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf()); |
| this.stepDownTimer.start(); |
| } |
| |
| // should be in writeLock |
| private void stepDown(final long term, final boolean wakeupCandidate, final Status status) { |
| LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term, |
| wakeupCandidate); |
| if (!this.state.isActive()) { |
| return; |
| } |
| if (this.state == State.STATE_CANDIDATE) { |
| stopVoteTimer(); |
| } |
| else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) { |
| stopStepDownTimer(); |
| this.ballotBox.clearPendingTasks(); |
| // signal fsm leader stop immediately |
| if (this.state == State.STATE_LEADER) { |
| onLeaderStop(status); |
| } |
| } |
| // reset leader_id |
| resetLeaderId(PeerId.emptyPeer(), status); |
| |
| // soft state in memory |
| this.state = State.STATE_FOLLOWER; |
| this.confCtx.reset(); |
| updateLastLeaderTimestamp(Utils.monotonicMs()); |
| if (this.snapshotExecutor != null) { |
| this.snapshotExecutor.interruptDownloadingSnapshots(term); |
| } |
| |
| // meta state |
| if (term > this.currTerm) { |
| this.currTerm = term; |
| this.votedId = PeerId.emptyPeer(); |
| this.metaStorage.setTermAndVotedFor(term, this.votedId); |
| } |
| |
| if (wakeupCandidate) { |
| this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf); |
| if (this.wakingCandidate != null) { |
| Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs()); |
| } |
| } |
| else { |
| this.replicatorGroup.stopAll(); |
| } |
| if (this.stopTransferArg != null) { |
| if (this.transferTimer != null) { |
| this.transferTimer.cancel(true); |
| } |
| // There is at most one StopTransferTimer at the same term, it's safe to |
| // mark stopTransferArg to NULL |
| this.stopTransferArg = null; |
| } |
| // Learner node will not trigger the election timer. |
| if (!isLearner()) { |
| this.electionTimer.restart(); |
| } |
| else { |
| LOG.info("Node {} is a learner, election timer is not started.", this.nodeId); |
| } |
| } |
| |
| // Should be in readLock |
| private boolean isLearner() { |
| return this.conf.listLearners().contains(this.serverId); |
| } |
| |
| private void stopStepDownTimer() { |
| if (this.stepDownTimer != null) { |
| this.stepDownTimer.stop(); |
| } |
| } |
| |
| private void stopVoteTimer() { |
| if (this.voteTimer != null) { |
| this.voteTimer.stop(); |
| } |
| } |
| |
| class LeaderStableClosure extends LogManager.StableClosure { |
| LeaderStableClosure(final List<LogEntry> entries) { |
| super(entries); |
| } |
| |
| @Override |
| public void run(final Status status) { |
| if (status.isOk()) { |
| NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, |
| NodeImpl.this.serverId); |
| } |
| else { |
| LOG.error("Node {} append [{}, {}] failed, status={}.", getNodeId(), this.firstLogIndex, |
| this.firstLogIndex + this.nEntries - 1, status); |
| } |
| } |
| } |
| |
| private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) { |
| this.writeLock.lock(); |
| try { |
| final int size = tasks.size(); |
| if (this.state != State.STATE_LEADER) { |
| final Status st = new Status(); |
| if (this.state != State.STATE_TRANSFERRING) { |
| st.setError(RaftError.EPERM, "Is not leader."); |
| } |
| else { |
| st.setError(RaftError.EBUSY, "Is transferring leadership."); |
| } |
| LOG.debug("Node {} can't apply, status={}.", getNodeId(), st); |
| final List<LogEntryAndClosure> savedTasks = new ArrayList<>(tasks); |
| Utils.runInThread(this.getOptions().getCommonExecutor(), () -> { |
| for (int i = 0; i < size; i++) { |
| savedTasks.get(i).done.run(st); |
| } |
| }); |
| return; |
| } |
| final List<LogEntry> entries = new ArrayList<>(size); |
| for (int i = 0; i < size; i++) { |
| final LogEntryAndClosure task = tasks.get(i); |
| if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) { |
| LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(), |
| task.expectedTerm, this.currTerm); |
| if (task.done != null) { |
| final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d", |
| task.expectedTerm, this.currTerm); |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, st); |
| } |
| continue; |
| } |
| if (!this.ballotBox.appendPendingTask(this.conf.getConf(), |
| this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, new Status(RaftError.EINTERNAL, "Fail to append task.")); |
| continue; |
| } |
| // set task entry info before adding to list. |
| task.entry.getId().setTerm(this.currTerm); |
| task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA); |
| entries.add(task.entry); |
| } |
| this.logManager.appendEntries(entries, new LeaderStableClosure(entries)); |
| // update conf.first |
| checkAndSetConfiguration(true); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Returns the node metrics. |
| * |
| * @return returns metrics of current node. |
| */ |
| @Override |
| public NodeMetrics getNodeMetrics() { |
| return this.metrics; |
| } |
| |
| /** |
| * Returns the JRaft service factory for current node. |
| * |
| * @return the service factory |
| */ |
| public JRaftServiceFactory getServiceFactory() { |
| return this.serviceFactory; |
| } |
| |
| @Override |
| public void readIndex(final byte[] requestContext, final ReadIndexClosure done) { |
| if (this.shutdownLatch != null) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); |
| throw new IllegalStateException("Node is shutting down"); |
| } |
| Requires.requireNonNull(done, "Null closure"); |
| this.readOnlyService.addRequest(requestContext, done); |
| } |
| |
| /** |
| * ReadIndex response closure |
| */ |
| private static class ReadIndexHeartbeatResponseClosure extends RpcResponseClosureAdapter<AppendEntriesResponse> { |
| final ReadIndexResponseBuilder respBuilder; |
| final RpcResponseClosure<ReadIndexResponse> closure; |
| final int quorum; |
| final int failPeersThreshold; |
| int ackSuccess; |
| int ackFailures; |
| boolean isDone; |
| |
| ReadIndexHeartbeatResponseClosure(final RpcResponseClosure<ReadIndexResponse> closure, |
| final ReadIndexResponseBuilder rb, final int quorum, |
| final int peersCount) { |
| super(); |
| this.closure = closure; |
| this.respBuilder = rb; |
| this.quorum = quorum; |
| this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum; |
| this.ackSuccess = 0; |
| this.ackFailures = 0; |
| this.isDone = false; |
| } |
| |
| @Override |
| public synchronized void run(final Status status) { |
| if (this.isDone) { |
| return; |
| } |
| if (status.isOk() && getResponse().success()) { |
| this.ackSuccess++; |
| } |
| else { |
| this.ackFailures++; |
| } |
| // Include leader self vote yes. |
| if (this.ackSuccess + 1 >= this.quorum) { |
| this.respBuilder.success(true); |
| this.closure.setResponse(this.respBuilder.build()); |
| this.closure.run(Status.OK()); |
| this.isDone = true; |
| } |
| else if (this.ackFailures >= this.failPeersThreshold) { |
| this.respBuilder.success(false); |
| this.closure.setResponse(this.respBuilder.build()); |
| this.closure.run(Status.OK()); |
| this.isDone = true; |
| } |
| } |
| } |
| |
| /** |
| * Handle read index request. |
| */ |
| @Override |
| public void handleReadIndexRequest(final ReadIndexRequest request, |
| final RpcResponseClosure<ReadIndexResponse> done) { |
| final long startMs = Utils.monotonicMs(); |
| this.readLock.lock(); |
| try { |
| switch (this.state) { |
| case STATE_LEADER: |
| readLeader(request, done); |
| break; |
| case STATE_FOLLOWER: |
| readFollower(request, done); |
| break; |
| case STATE_TRANSFERRING: |
| done.run(new Status(RaftError.EBUSY, "Is transferring leadership.")); |
| break; |
| default: |
| done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state)); |
| break; |
| } |
| } |
| finally { |
| this.readLock.unlock(); |
| this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs); |
| this.metrics.recordSize("handle-read-index-entries", Utils.size(request.entriesList())); |
| } |
| } |
| |
| private int getQuorum() { |
| final Configuration c = this.conf.getConf(); |
| if (c.isEmpty()) { |
| return 0; |
| } |
| return c.getPeers().size() / 2 + 1; |
| } |
| |
| private void readFollower(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> closure) { |
| if (this.leaderId == null || this.leaderId.isEmpty()) { |
| closure.run(new Status(RaftError.EPERM, "No leader at term %d.", this.currTerm)); |
| return; |
| } |
| // send request to leader. |
| final ReadIndexRequest newRequest = raftOptions.getRaftMessagesFactory() |
| .readIndexRequest() |
| .groupId(request.groupId()) |
| .serverId(request.serverId()) |
| .peerId(request.peerId()) |
| .entriesList(request.entriesList()) |
| .peerId(this.leaderId.toString()) |
| .build(); |
| this.rpcClientService.readIndex(this.leaderId.getEndpoint(), newRequest, -1, closure); |
| } |
| |
| private void readLeader(ReadIndexRequest request, RpcResponseClosure<ReadIndexResponse> closure) { |
| ReadIndexResponseBuilder respBuilder = raftOptions.getRaftMessagesFactory().readIndexResponse(); |
| |
| final int quorum = getQuorum(); |
| if (quorum <= 1) { |
| // Only one peer, fast path. |
| respBuilder |
| .success(true) |
| .index(this.ballotBox.getLastCommittedIndex()); |
| closure.setResponse(respBuilder.build()); |
| closure.run(Status.OK()); |
| return; |
| } |
| |
| final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex(); |
| if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) { |
| // Reject read only request when this leader has not committed any log entry at its term |
| closure |
| .run(new Status( |
| RaftError.EAGAIN, |
| "ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.", |
| lastCommittedIndex, this.currTerm)); |
| return; |
| } |
| respBuilder.index(lastCommittedIndex); |
| |
| if (request.peerId() != null) { |
| // request from follower or learner, check if the follower/learner is in current conf. |
| final PeerId peer = new PeerId(); |
| peer.parse(request.serverId()); |
| if (!this.conf.contains(peer) && !this.conf.containsLearner(peer)) { |
| closure |
| .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: %s.", peer, this.conf)); |
| return; |
| } |
| } |
| |
| ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions(); |
| if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) { |
| // If leader lease timeout, we must change option to ReadOnlySafe |
| readOnlyOpt = ReadOnlyOption.ReadOnlySafe; |
| } |
| |
| switch (readOnlyOpt) { |
| case ReadOnlySafe: |
| final List<PeerId> peers = this.conf.getConf().getPeers(); |
| Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers"); |
| final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure, |
| respBuilder, quorum, peers.size()); |
| // Send heartbeat requests to followers |
| for (final PeerId peer : peers) { |
| if (peer.equals(this.serverId)) { |
| continue; |
| } |
| this.replicatorGroup.sendHeartbeat(peer, heartbeatDone); |
| } |
| break; |
| case ReadOnlyLeaseBased: |
| // Responses to followers and local node. |
| respBuilder.success(true); |
| closure.setResponse(respBuilder.build()); |
| closure.run(Status.OK()); |
| break; |
| } |
| } |
| |
| @Override |
| public void apply(final Task task) { |
| if (this.shutdownLatch != null) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down.")); |
| throw new IllegalStateException("Node is shutting down"); |
| } |
| Requires.requireNonNull(task, "Null task"); |
| |
| final LogEntry entry = new LogEntry(); |
| entry.setData(task.getData()); |
| int retryTimes = 0; |
| try { |
| final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> { |
| event.reset(); |
| event.groupId = groupId; |
| event.done = task.getDone(); |
| event.entry = entry; |
| event.expectedTerm = task.getExpectedTerm(); |
| }; |
| while (true) { |
| if (this.applyQueue.tryPublishEvent(translator)) { |
| break; |
| } |
| else { |
| retryTimes++; |
| if (retryTimes > MAX_APPLY_RETRY_TIMES) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), |
| new Status(RaftError.EBUSY, "Node is busy, has too many tasks.")); |
| LOG.warn("Node {} applyQueue is overload.", getNodeId()); |
| this.metrics.recordTimes("apply-task-overload-times", 1); |
| return; |
| } |
| ThreadHelper.onSpinWait(); |
| } |
| } |
| |
| } |
| catch (final Exception e) { |
| LOG.error("Fail to apply task.", e); |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.EPERM, "Node is down.")); |
| } |
| } |
| |
| @Override |
| public Message handlePreVoteRequest(final RequestVoteRequest request) { |
| boolean doUnlock = true; |
| this.writeLock.lock(); |
| try { |
| if (!this.state.isActive()) { |
| LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); |
| } |
| final PeerId candidateId = new PeerId(); |
| if (!candidateId.parse(request.serverId())) { |
| LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(), |
| request.serverId()); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Parse candidateId failed: %s.", request.serverId()); |
| } |
| boolean granted = false; |
| // noinspection ConstantConditions |
| do { |
| if (!this.conf.contains(candidateId)) { |
| LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(), |
| request.serverId(), this.conf); |
| break; |
| } |
| if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) { |
| LOG.info( |
| "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.", |
| getNodeId(), request.serverId(), request.term(), this.currTerm, this.leaderId); |
| break; |
| } |
| if (request.term() < this.currTerm) { |
| LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(), |
| request.serverId(), request.term(), this.currTerm); |
| // A follower replicator may not be started when this node become leader, so we must check it. |
| checkReplicator(candidateId); |
| break; |
| } |
| // A follower replicator may not be started when this node become leader, so we must check it. |
| // check replicator state |
| checkReplicator(candidateId); |
| |
| doUnlock = false; |
| this.writeLock.unlock(); |
| |
| final LogId lastLogId = this.logManager.getLastLogId(true); |
| |
| doUnlock = true; |
| this.writeLock.lock(); |
| final LogId requestLastLogId = new LogId(request.lastLogIndex(), request.lastLogTerm()); |
| granted = requestLastLogId.compareTo(lastLogId) >= 0; |
| |
| LOG.info( |
| "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.", |
| getNodeId(), request.serverId(), request.term(), this.currTerm, granted, requestLastLogId, |
| lastLogId); |
| } |
| while (false); |
| |
| return raftOptions.getRaftMessagesFactory() |
| .requestVoteResponse() |
| .term(this.currTerm) |
| .granted(granted) |
| .build(); |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| } |
| } |
| |
| // in read_lock |
| private boolean isLeaderLeaseValid() { |
| final long monotonicNowMs = Utils.monotonicMs(); |
| if (checkLeaderLease(monotonicNowMs)) { |
| return true; |
| } |
| checkDeadNodes0(this.conf.getConf().getPeers(), monotonicNowMs, false, null); |
| return checkLeaderLease(monotonicNowMs); |
| } |
| |
| private boolean checkLeaderLease(final long monotonicNowMs) { |
| return monotonicNowMs - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs(); |
| } |
| |
| private boolean isCurrentLeaderValid() { |
| return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs(); |
| } |
| |
| private void updateLastLeaderTimestamp(final long lastLeaderTimestamp) { |
| this.lastLeaderTimestamp = lastLeaderTimestamp; |
| } |
| |
| private void checkReplicator(final PeerId candidateId) { |
| if (this.state == State.STATE_LEADER) { |
| this.replicatorGroup.checkReplicator(candidateId, false); |
| } |
| } |
| |
| @Override |
| public Message handleRequestVoteRequest(final RequestVoteRequest request) { |
| boolean doUnlock = true; |
| this.writeLock.lock(); |
| try { |
| if (!this.state.isActive()) { |
| LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); |
| } |
| final PeerId candidateId = new PeerId(); |
| if (!candidateId.parse(request.serverId())) { |
| LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), |
| request.serverId()); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Parse candidateId failed: %s.", request.serverId()); |
| } |
| |
| // noinspection ConstantConditions |
| do { |
| // check term |
| if (request.term() >= this.currTerm) { |
| LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), |
| request.serverId(), request.term(), this.currTerm); |
| // increase current term, change state to follower |
| if (request.term() > this.currTerm) { |
| stepDown(request.term(), false, new Status(RaftError.EHIGHERTERMRESPONSE, |
| "Raft node receives higher term RequestVoteRequest.")); |
| } |
| } |
| else { |
| // ignore older term |
| LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), |
| request.serverId(), request.term(), this.currTerm); |
| break; |
| } |
| doUnlock = false; |
| this.writeLock.unlock(); |
| |
| final LogId lastLogId = this.logManager.getLastLogId(true); |
| |
| doUnlock = true; |
| this.writeLock.lock(); |
| // vote need ABA check after unlock&writeLock |
| if (request.term() != this.currTerm) { |
| LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); |
| break; |
| } |
| |
| final boolean logIsOk = new LogId(request.lastLogIndex(), request.lastLogTerm()) |
| .compareTo(lastLogId) >= 0; |
| |
| if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) { |
| stepDown(request.term(), false, new Status(RaftError.EVOTEFORCANDIDATE, |
| "Raft node votes for some candidate, step down to restart election_timer.")); |
| this.votedId = candidateId.copy(); |
| this.metaStorage.setVotedFor(candidateId); |
| } |
| } |
| while (false); |
| |
| return raftOptions.getRaftMessagesFactory() |
| .requestVoteResponse() |
| .term(this.currTerm) |
| .granted(request.term() == this.currTerm && candidateId.equals(this.votedId)) |
| .build(); |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| } |
| } |
| |
| private static class FollowerStableClosure extends LogManager.StableClosure { |
| final long committedIndex; |
| final AppendEntriesResponseBuilder responseBuilder; |
| final NodeImpl node; |
| final RpcRequestClosure done; |
| final long term; |
| |
| FollowerStableClosure(final AppendEntriesRequest request, |
| final AppendEntriesResponseBuilder responseBuilder, final NodeImpl node, |
| final RpcRequestClosure done, final long term) { |
| super(null); |
| this.committedIndex = Math.min( |
| // committed index is likely less than the lastLogIndex |
| request.committedIndex(), |
| // The logs after the appended entries can not be trust, so we can't commit them even if their indexes are less than request's committed index. |
| request.prevLogIndex() + Utils.size(request.entriesList())); |
| this.responseBuilder = responseBuilder; |
| this.node = node; |
| this.done = done; |
| this.term = term; |
| } |
| |
| @Override |
| public void run(final Status status) { |
| if (!status.isOk()) { |
| this.done.run(status); |
| return; |
| } |
| |
| this.node.readLock.lock(); |
| try { |
| if (this.term != this.node.currTerm) { |
| // The change of term indicates that leader has been changed during |
| // appending entries, so we can't respond ok to the old leader |
| // because we are not sure if the appended logs would be truncated |
| // by the new leader: |
| // - If they won't be truncated and we respond failure to the old |
| // leader, the new leader would know that they are stored in this |
| // peer and they will be eventually committed when the new leader |
| // found that quorum of the cluster have stored. |
| // - If they will be truncated and we responded success to the old |
| // leader, the old leader would possibly regard those entries as |
| // committed (very likely in a 3-nodes cluster) and respond |
| // success to the clients, which would break the rule that |
| // committed entries would never be truncated. |
| // So we have to respond failure to the old leader and set the new |
| // term to make it stepped down if it didn't. |
| // TODO asch make test scenario https://issues.apache.org/jira/browse/IGNITE-14832 |
| this.responseBuilder.success(false).term(this.node.currTerm); |
| this.done.sendResponse(this.responseBuilder.build()); |
| return; |
| } |
| } |
| finally { |
| // It's safe to release lock as we know everything is ok at this point. |
| this.node.readLock.unlock(); |
| } |
| |
| // Don't touch node any more. |
| this.responseBuilder.success(true).term(this.term); |
| |
| // Ballot box is thread safe and tolerates disorder. |
| this.node.ballotBox.setLastCommittedIndex(this.committedIndex); |
| |
| this.done.sendResponse(this.responseBuilder.build()); |
| } |
| } |
| |
| @Override |
| public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) { |
| boolean doUnlock = true; |
| final long startMs = Utils.monotonicMs(); |
| this.writeLock.lock(); |
| final int entriesCount = Utils.size(request.entriesList()); |
| try { |
| if (!this.state.isActive()) { |
| LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); |
| } |
| |
| final PeerId serverId = new PeerId(); |
| if (!serverId.parse(request.serverId())) { |
| LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(), |
| request.serverId()); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Parse serverId failed: %s.", request.serverId()); |
| } |
| |
| // Check stale term |
| if (request.term() < this.currTerm) { |
| LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(), |
| request.serverId(), request.term(), this.currTerm); |
| return raftOptions.getRaftMessagesFactory() |
| .appendEntriesResponse() |
| .success(false) |
| .term(this.currTerm) |
| .build(); |
| } |
| |
| // Check term and state to step down |
| checkStepDown(request.term(), serverId); |
| if (!serverId.equals(this.leaderId)) { |
| LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.", |
| serverId, this.currTerm, this.leaderId); |
| // Increase the term by 1 and make both leaders step down to minimize the |
| // loss of split brain |
| stepDown(request.term() + 1, false, new Status(RaftError.ELEADERCONFLICT, |
| "More than one leader in the same term.")); |
| return raftOptions.getRaftMessagesFactory() |
| .appendEntriesResponse() |
| .success(false) // |
| .term(request.term() + 1) // |
| .build(); |
| } |
| |
| updateLastLeaderTimestamp(Utils.monotonicMs()); |
| |
| if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) { |
| LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId()); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EBUSY, |
| "Node %s:%s is installing snapshot.", this.groupId, this.serverId); |
| } |
| |
| final long prevLogIndex = request.prevLogIndex(); |
| final long prevLogTerm = request.prevLogTerm(); |
| final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex); |
| if (localPrevLogTerm != prevLogTerm) { |
| final long lastLogIndex = this.logManager.getLastLogIndex(); |
| |
| LOG.warn("Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, " + |
| "prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.", |
| getNodeId(), request.serverId(), request.term(), prevLogIndex, prevLogTerm, localPrevLogTerm, |
| lastLogIndex, entriesCount); |
| |
| return raftOptions.getRaftMessagesFactory() |
| .appendEntriesResponse() |
| .success(false) |
| .term(this.currTerm) |
| .lastLogIndex(lastLogIndex) |
| .build(); |
| } |
| |
| if (entriesCount == 0) { |
| // heartbeat or probe request |
| final AppendEntriesResponseBuilder respBuilder = raftOptions.getRaftMessagesFactory() |
| .appendEntriesResponse() |
| .success(true) |
| .term(this.currTerm) |
| .lastLogIndex(this.logManager.getLastLogIndex()); |
| doUnlock = false; |
| this.writeLock.unlock(); |
| // see the comments at FollowerStableClosure#run() |
| this.ballotBox.setLastCommittedIndex(Math.min(request.committedIndex(), prevLogIndex)); |
| return respBuilder.build(); |
| } |
| |
| // Parse request |
| long index = prevLogIndex; |
| final List<LogEntry> entries = new ArrayList<>(entriesCount); |
| ByteBuffer allData = request.data() != null ? request.data().asReadOnlyByteBuffer() : ByteString.EMPTY.asReadOnlyByteBuffer(); |
| |
| final List<RaftOutter.EntryMeta> entriesList = request.entriesList(); |
| for (int i = 0; i < entriesCount; i++) { |
| index++; |
| final RaftOutter.EntryMeta entry = entriesList.get(i); |
| |
| final LogEntry logEntry = logEntryFromMeta(index, allData, entry); |
| |
| if (logEntry != null) { |
| // Validate checksum |
| if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) { |
| long realChecksum = logEntry.checksum(); |
| LOG.error( |
| "Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, realChecksum={}", |
| logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), |
| realChecksum); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", |
| logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), |
| realChecksum); |
| } |
| entries.add(logEntry); |
| } |
| } |
| |
| final FollowerStableClosure closure = new FollowerStableClosure( |
| request, |
| raftOptions.getRaftMessagesFactory().appendEntriesResponse().term(this.currTerm), |
| this, |
| done, |
| this.currTerm |
| ); |
| this.logManager.appendEntries(entries, closure); |
| // update configuration after _log_manager updated its memory status |
| checkAndSetConfiguration(true); |
| return null; |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs); |
| this.metrics.recordSize("handle-append-entries-count", entriesCount); |
| } |
| } |
| |
| private LogEntry logEntryFromMeta(final long index, final ByteBuffer allData, final RaftOutter.EntryMeta entry) { |
| if (entry.type() != EnumOutter.EntryType.ENTRY_TYPE_UNKNOWN) { |
| final LogEntry logEntry = new LogEntry(); |
| logEntry.setId(new LogId(index, entry.term())); |
| logEntry.setType(entry.type()); |
| |
| if (entry.hasChecksum()) |
| logEntry.setChecksum(entry.checksum()); // since 1.2.6 |
| |
| final long dataLen = entry.dataLen(); |
| if (dataLen > 0) { |
| final byte[] bs = new byte[(int) dataLen]; |
| assert allData != null; |
| allData.get(bs, 0, bs.length); |
| logEntry.setData(ByteBuffer.wrap(bs)); |
| } |
| |
| if (entry.peersList() != null) { |
| if (entry.type() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { |
| throw new IllegalStateException( |
| "Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: " |
| + entry.type()); |
| } |
| |
| fillLogEntryPeers(entry, logEntry); |
| } |
| else if (entry.type() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { |
| throw new IllegalStateException( |
| "Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type"); |
| } |
| return logEntry; |
| } |
| return null; |
| } |
| |
| private void fillLogEntryPeers(final RaftOutter.EntryMeta entry, final LogEntry logEntry) { |
| // TODO refactor https://issues.apache.org/jira/browse/IGNITE-14832 |
| if (entry.peersList() != null) { |
| final List<PeerId> peers = new ArrayList<>(); |
| for (final String peerStr : entry.peersList()) { |
| final PeerId peer = new PeerId(); |
| peer.parse(peerStr); |
| peers.add(peer); |
| } |
| logEntry.setPeers(peers); |
| } |
| |
| if (entry.oldPeersList() != null) { |
| final List<PeerId> oldPeers = new ArrayList<>(); |
| for (final String peerStr : entry.oldPeersList()) { |
| final PeerId peer = new PeerId(); |
| peer.parse(peerStr); |
| oldPeers.add(peer); |
| } |
| logEntry.setOldPeers(oldPeers); |
| } |
| |
| if (entry.learnersList() != null) { |
| final List<PeerId> peers = new ArrayList<>(); |
| for (final String peerStr : entry.learnersList()) { |
| final PeerId peer = new PeerId(); |
| peer.parse(peerStr); |
| peers.add(peer); |
| } |
| logEntry.setLearners(peers); |
| } |
| |
| if (entry.oldLearnersList() != null) { |
| final List<PeerId> peers = new ArrayList<>(); |
| for (final String peerStr : entry.oldLearnersList()) { |
| final PeerId peer = new PeerId(); |
| peer.parse(peerStr); |
| peers.add(peer); |
| } |
| logEntry.setOldLearners(peers); |
| } |
| } |
| |
| // called when leader receive greater term in AppendEntriesResponse |
| void increaseTermTo(final long newTerm, final Status status) { |
| this.writeLock.lock(); |
| try { |
| if (newTerm < this.currTerm) { |
| return; |
| } |
| stepDown(newTerm, false, status); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Peer catch up callback |
| */ |
| private static class OnCaughtUp extends CatchUpClosure { |
| private final NodeImpl node; |
| private final long term; |
| private final PeerId peer; |
| private final long version; |
| |
| OnCaughtUp(final NodeImpl node, final long term, final PeerId peer, final long version) { |
| super(); |
| this.node = node; |
| this.term = term; |
| this.peer = peer; |
| this.version = version; |
| } |
| |
| @Override |
| public void run(final Status status) { |
| this.node.onCaughtUp(this.peer, this.term, this.version, status); |
| } |
| } |
| |
| private void onCaughtUp(final PeerId peer, final long term, final long version, final Status st) { |
| this.writeLock.lock(); |
| try { |
| // check current_term and state to avoid ABA problem |
| if (term != this.currTerm && this.state != State.STATE_LEADER) { |
| // term has changed and nothing should be done, otherwise there will be |
| // an ABA problem. |
| return; |
| } |
| if (st.isOk()) { |
| // Caught up successfully |
| this.confCtx.onCaughtUp(version, peer, true); |
| return; |
| } |
| // Retry if this peer is still alive |
| if (st.getCode() == RaftError.ETIMEDOUT.getNumber() |
| && Utils.monotonicMs() - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= this.options |
| .getElectionTimeoutMs()) { |
| LOG.debug("Node {} waits peer {} to catch up.", getNodeId(), peer); |
| final OnCaughtUp caughtUp = new OnCaughtUp(this, term, peer, version); |
| final long dueTime = Utils.nowMs() + this.options.getElectionTimeoutMs(); |
| if (this.replicatorGroup.waitCaughtUp(peer, this.options.getCatchupMargin(), dueTime, caughtUp)) { |
| return; |
| } |
| LOG.warn("Node {} waitCaughtUp failed, peer={}.", getNodeId(), peer); |
| } |
| LOG.warn("Node {} caughtUp failed, status={}, peer={}.", getNodeId(), st, peer); |
| this.confCtx.onCaughtUp(version, peer, false); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * @param conf The configuration. |
| * @param monotonicNowMs The timestamp. |
| * @param stepDownOnCheckFail {@code True} to step down on check fail. |
| * @return {@code True} if a majority of peers are alive. |
| */ |
| private boolean checkDeadNodes(final Configuration conf, final long monotonicNowMs, |
| final boolean stepDownOnCheckFail) { |
| // Check learner replicators at first. |
| for (final PeerId peer : conf.getLearners()) { |
| checkReplicator(peer); |
| } |
| // Ensure quorum nodes alive. |
| final List<PeerId> peers = conf.listPeers(); |
| final Configuration deadNodes = new Configuration(); |
| if (checkDeadNodes0(peers, monotonicNowMs, true, deadNodes)) { |
| return true; |
| } |
| if (stepDownOnCheckFail) { |
| LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.", |
| getNodeId(), this.currTerm, deadNodes, conf); |
| final Status status = new Status(); |
| status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), |
| peers.size()); |
| stepDown(this.currTerm, false, status); |
| } |
| return false; |
| } |
| |
| /** |
| * TODO asch https://issues.apache.org/jira/browse/IGNITE-14843 |
| * @param peers Peers list. |
| * @param monotonicNowMs The timestamp. |
| * @param checkReplicator {@code True} to check replicator. |
| * @param deadNodes The configuration. |
| * @return {@code True} if a majority of nodes are alive. |
| */ |
| private boolean checkDeadNodes0(final List<PeerId> peers, final long monotonicNowMs, final boolean checkReplicator, |
| final Configuration deadNodes) { |
| final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs(); |
| int aliveCount = 0; |
| long startLease = Long.MAX_VALUE; |
| for (final PeerId peer : peers) { |
| if (peer.equals(this.serverId)) { |
| aliveCount++; |
| continue; |
| } |
| if (checkReplicator) { |
| checkReplicator(peer); |
| } |
| final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer); |
| if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) { |
| aliveCount++; |
| if (startLease > lastRpcSendTimestamp) { |
| startLease = lastRpcSendTimestamp; |
| } |
| continue; |
| } |
| if (deadNodes != null) { |
| deadNodes.addPeer(peer); |
| } |
| } |
| if (aliveCount >= peers.size() / 2 + 1) { |
| updateLastLeaderTimestamp(startLease); |
| return true; |
| } |
| return false; |
| } |
| |
| // in read_lock |
| private List<PeerId> getAliveNodes(final Collection<PeerId> peers, final long monotonicNowMs) { |
| final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs(); |
| final List<PeerId> alivePeers = new ArrayList<>(); |
| for (final PeerId peer : peers) { |
| if (peer.equals(this.serverId)) { |
| alivePeers.add(peer.copy()); |
| continue; |
| } |
| if (monotonicNowMs - this.replicatorGroup.getLastRpcSendTimestamp(peer) <= leaderLeaseTimeoutMs) { |
| alivePeers.add(peer.copy()); |
| } |
| } |
| return alivePeers; |
| } |
| |
| @SuppressWarnings({"LoopStatementThatDoesntLoop", "ConstantConditions"}) |
| private void handleStepDownTimeout() { |
| do { |
| this.readLock.lock(); |
| try { |
| if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) { |
| LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, |
| this.state); |
| return; |
| } |
| final long monotonicNowMs = Utils.monotonicMs(); |
| if (!checkDeadNodes(this.conf.getConf(), monotonicNowMs, false)) { |
| break; |
| } |
| if (!this.conf.getOldConf().isEmpty()) { |
| if (!checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, false)) { |
| break; |
| } |
| } |
| return; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| while (false); |
| |
| this.writeLock.lock(); |
| try { |
| if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) { |
| LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state); |
| return; |
| } |
| final long monotonicNowMs = Utils.monotonicMs(); |
| checkDeadNodes(this.conf.getConf(), monotonicNowMs, true); |
| if (!this.conf.getOldConf().isEmpty()) { |
| checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, true); |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| /** |
| * Configuration changed callback. |
| */ |
| private class ConfigurationChangeDone implements Closure { |
| private final long term; |
| private final boolean leaderStart; |
| |
| ConfigurationChangeDone(final long term, final boolean leaderStart) { |
| super(); |
| this.term = term; |
| this.leaderStart = leaderStart; |
| } |
| |
| @Override |
| public void run(final Status status) { |
| if (status.isOk()) { |
| onConfigurationChangeDone(this.term); |
| if (this.leaderStart) { |
| getOptions().getFsm().onLeaderStart(this.term); |
| } |
| } |
| else { |
| LOG.error("Fail to run ConfigurationChangeDone, status: {}.", status); |
| } |
| } |
| } |
| |
| private void unsafeApplyConfiguration(final Configuration newConf, final Configuration oldConf, |
| final boolean leaderStart) { |
| Requires.requireTrue(this.confCtx.isBusy(), "ConfigurationContext is not busy"); |
| final LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); |
| entry.setId(new LogId(0, this.currTerm)); |
| entry.setPeers(newConf.listPeers()); |
| entry.setLearners(newConf.listLearners()); |
| if (oldConf != null) { |
| entry.setOldPeers(oldConf.listPeers()); |
| entry.setOldLearners(oldConf.listLearners()); |
| } |
| final ConfigurationChangeDone configurationChangeDone = new ConfigurationChangeDone(this.currTerm, leaderStart); |
| // Use the new_conf to deal the quorum of this very log |
| if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone)) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), configurationChangeDone, new Status(RaftError.EINTERNAL, "Fail to append task.")); |
| return; |
| } |
| final List<LogEntry> entries = new ArrayList<>(); |
| entries.add(entry); |
| this.logManager.appendEntries(entries, new LeaderStableClosure(entries)); |
| checkAndSetConfiguration(false); |
| } |
| |
| private void unsafeRegisterConfChange(final Configuration oldConf, final Configuration newConf, |
| final Closure done) { |
| |
| Requires.requireTrue(newConf.isValid(), "Invalid new conf: %s", newConf); |
| // The new conf entry(will be stored in log manager) should be valid |
| Requires.requireTrue(new ConfigurationEntry(null, newConf, oldConf).isValid(), "Invalid conf entry: %s", |
| newConf); |
| |
| if (this.state != State.STATE_LEADER) { |
| LOG.warn("Node {} refused configuration changing as the state={}.", getNodeId(), this.state); |
| if (done != null) { |
| final Status status = new Status(); |
| if (this.state == State.STATE_TRANSFERRING) { |
| status.setError(RaftError.EBUSY, "Is transferring leadership."); |
| } |
| else { |
| status.setError(RaftError.EPERM, "Not leader"); |
| } |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, status); |
| } |
| return; |
| } |
| // check concurrent conf change |
| if (this.confCtx.isBusy()) { |
| LOG.warn("Node {} refused configuration concurrent changing.", getNodeId()); |
| if (done != null) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, new Status(RaftError.EBUSY, "Doing another configuration change.")); |
| } |
| return; |
| } |
| // Return immediately when the new peers equals to current configuration |
| if (this.conf.getConf().equals(newConf)) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done); |
| return; |
| } |
| this.confCtx.start(oldConf, newConf, done); |
| } |
| |
| private void afterShutdown() { |
| List<Closure> savedDoneList = null; |
| this.writeLock.lock(); |
| try { |
| if (!this.shutdownContinuations.isEmpty()) { |
| savedDoneList = new ArrayList<>(this.shutdownContinuations); |
| } |
| if (this.logStorage != null) { |
| this.logStorage.shutdown(); |
| } |
| this.state = State.STATE_SHUTDOWN; |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| if (savedDoneList != null) { |
| for (final Closure closure : savedDoneList) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), closure); |
| } |
| } |
| } |
| |
| @Override |
| public NodeOptions getOptions() { |
| return this.options; |
| } |
| |
| public Scheduler getTimerManager() { |
| return this.timerManager; |
| } |
| |
| @Override |
| public RaftOptions getRaftOptions() { |
| return this.raftOptions; |
| } |
| |
| @OnlyForTest |
| long getCurrentTerm() { |
| this.readLock.lock(); |
| try { |
| return this.currTerm; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @OnlyForTest |
| ConfigurationEntry getConf() { |
| this.readLock.lock(); |
| try { |
| return this.conf; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void shutdown() { |
| shutdown(null); |
| } |
| |
| public void onConfigurationChangeDone(final long term) { |
| this.writeLock.lock(); |
| try { |
| if (term != this.currTerm || this.state.compareTo(State.STATE_TRANSFERRING) > 0) { |
| LOG.warn("Node {} process onConfigurationChangeDone at term {} while state={}, currTerm={}.", |
| getNodeId(), term, this.state, this.currTerm); |
| return; |
| } |
| this.confCtx.nextStage(); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public PeerId getLeaderId() { |
| this.readLock.lock(); |
| try { |
| return this.leaderId.isEmpty() ? null : this.leaderId; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getGroupId() { |
| return this.groupId; |
| } |
| |
| public PeerId getServerId() { |
| return this.serverId; |
| } |
| |
| @Override |
| public NodeId getNodeId() { |
| if (this.nodeId == null) { |
| this.nodeId = new NodeId(this.groupId, this.serverId); |
| } |
| return this.nodeId; |
| } |
| |
| public RaftClientService getRpcClientService() { |
| return this.rpcClientService; |
| } |
| |
| public void onError(final RaftException error) { |
| LOG.warn("Node {} got error: {}.", getNodeId(), (Object)error); |
| if (this.fsmCaller != null) { |
| // onError of fsmCaller is guaranteed to be executed once. |
| this.fsmCaller.onError(error); |
| } |
| if (this.readOnlyService != null) { |
| this.readOnlyService.setError(error); |
| } |
| this.writeLock.lock(); |
| try { |
| // If it is leader, need to wake up a new one; |
| // If it is follower, also step down to call on_stop_following. |
| if (this.state.compareTo(State.STATE_FOLLOWER) <= 0) { |
| stepDown(this.currTerm, this.state == State.STATE_LEADER, new Status(RaftError.EBADNODE, |
| "Raft node(leader or candidate) is in error.")); |
| } |
| if (this.state.compareTo(State.STATE_ERROR) < 0) { |
| this.state = State.STATE_ERROR; |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) { |
| this.writeLock.lock(); |
| try { |
| if (this.state != State.STATE_CANDIDATE) { |
| LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.", |
| getNodeId(), peerId, this.state); |
| return; |
| } |
| // check stale term |
| if (term != this.currTerm) { |
| LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(), |
| peerId, term, this.currTerm); |
| return; |
| } |
| // check response term |
| if (response.term() > this.currTerm) { |
| LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(), |
| peerId, response.term(), this.currTerm); |
| stepDown(response.term(), false, new Status(RaftError.EHIGHERTERMRESPONSE, |
| "Raft node receives higher term request_vote_response.")); |
| return; |
| } |
| // check granted quorum? |
| if (response.granted()) { |
| this.voteCtx.grant(peerId); |
| if (this.voteCtx.isGranted()) { |
| becomeLeader(); |
| } |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| private class OnRequestVoteRpcDone extends RpcResponseClosureAdapter<RequestVoteResponse> { |
| |
| final long startMs; |
| final PeerId peer; |
| final long term; |
| final NodeImpl node; |
| RequestVoteRequest request; |
| |
| OnRequestVoteRpcDone(final PeerId peer, final long term, final NodeImpl node) { |
| super(); |
| this.startMs = Utils.monotonicMs(); |
| this.peer = peer; |
| this.term = term; |
| this.node = node; |
| } |
| |
| @Override |
| public void run(final Status status) { |
| NodeImpl.this.metrics.recordLatency("request-vote", Utils.monotonicMs() - this.startMs); |
| if (!status.isOk()) { |
| LOG.warn("Node {} RequestVote to {} error: {}.", this.node.getNodeId(), this.peer, status); |
| } |
| else { |
| this.node.handleRequestVoteResponse(this.peer, this.term, getResponse()); |
| } |
| } |
| } |
| |
| public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) { |
| boolean doUnlock = true; |
| this.writeLock.lock(); |
| try { |
| if (this.state != State.STATE_FOLLOWER) { |
| LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.", |
| getNodeId(), peerId, this.state); |
| return; |
| } |
| if (term != this.currTerm) { |
| LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(), |
| peerId, term, this.currTerm); |
| return; |
| } |
| if (response.term() > this.currTerm) { |
| LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId, |
| response.term(), this.currTerm); |
| stepDown(response.term(), false, new Status(RaftError.EHIGHERTERMRESPONSE, |
| "Raft node receives higher term pre_vote_response.")); |
| return; |
| } |
| LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId, |
| response.term(), response.granted()); |
| // check granted quorum? |
| if (response.granted()) { |
| this.prevVoteCtx.grant(peerId); |
| if (this.prevVoteCtx.isGranted()) { |
| doUnlock = false; |
| electSelf(); |
| } |
| } |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| } |
| } |
| |
| private class OnPreVoteRpcDone extends RpcResponseClosureAdapter<RequestVoteResponse> { |
| |
| final long startMs; |
| final PeerId peer; |
| final long term; |
| RequestVoteRequest request; |
| |
| OnPreVoteRpcDone(final PeerId peer, final long term) { |
| super(); |
| this.startMs = Utils.monotonicMs(); |
| this.peer = peer; |
| this.term = term; |
| } |
| |
| @Override |
| public void run(final Status status) { |
| NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs); |
| if (!status.isOk()) { |
| LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status); |
| } |
| else { |
| handlePreVoteResponse(this.peer, this.term, getResponse()); |
| } |
| } |
| } |
| |
| // in writeLock |
| private void preVote() { |
| long oldTerm; |
| try { |
| LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm); |
| if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) { |
| LOG.warn( |
| "Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.", |
| getNodeId(), this.currTerm); |
| return; |
| } |
| if (!this.conf.contains(this.serverId)) { |
| LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf); |
| return; |
| } |
| oldTerm = this.currTerm; |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| |
| final LogId lastLogId = this.logManager.getLastLogId(true); |
| |
| boolean doUnlock = true; |
| this.writeLock.lock(); |
| try { |
| // pre_vote need defense ABA after unlock&writeLock |
| if (oldTerm != this.currTerm) { |
| LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); |
| return; |
| } |
| this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); |
| for (final PeerId peer : this.conf.listPeers()) { |
| if (peer.equals(this.serverId)) { |
| continue; |
| } |
| if (!this.rpcClientService.connect(peer.getEndpoint())) { |
| LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint()); |
| continue; |
| } |
| final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm); |
| done.request = raftOptions.getRaftMessagesFactory() |
| .requestVoteRequest() |
| .preVote(true) // it's a pre-vote request. |
| .groupId(this.groupId) |
| .serverId(this.serverId.toString()) |
| .peerId(peer.toString()) |
| .term(this.currTerm + 1) // next term |
| .lastLogIndex(lastLogId.getIndex()) |
| .lastLogTerm(lastLogId.getTerm()) |
| .build(); |
| this.rpcClientService.preVote(peer.getEndpoint(), done.request, done); |
| } |
| this.prevVoteCtx.grant(this.serverId); |
| if (this.prevVoteCtx.isGranted()) { |
| doUnlock = false; |
| electSelf(); |
| } |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| } |
| } |
| |
| private void handleVoteTimeout() { |
| this.writeLock.lock(); |
| if (this.state != State.STATE_CANDIDATE) { |
| this.writeLock.unlock(); |
| return; |
| } |
| |
| if (this.raftOptions.isStepDownWhenVoteTimedout()) { |
| LOG.warn( |
| "Candidate node {} term {} steps down when election reaching vote timeout: fail to get quorum vote-granted.", |
| this.nodeId, this.currTerm); |
| stepDown(this.currTerm, false, new Status(RaftError.ETIMEDOUT, |
| "Vote timeout: fail to get quorum vote-granted.")); |
| // unlock in preVote |
| preVote(); |
| } |
| else { |
| LOG.debug("Node {} term {} retry to vote self.", getNodeId(), this.currTerm); |
| // unlock in electSelf |
| electSelf(); |
| } |
| } |
| |
| @Override |
| public boolean isLeader() { |
| return isLeader(true); |
| } |
| |
| @Override |
| public boolean isLeader(final boolean blocking) { |
| if (!blocking) { |
| return this.state == State.STATE_LEADER; |
| } |
| this.readLock.lock(); |
| try { |
| return this.state == State.STATE_LEADER; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void shutdown(final Closure done) { |
| List<RepeatedTimer> timers = null; |
| this.writeLock.lock(); |
| try { |
| LOG.info("Node {} shutdown, currTerm={} state={}.", getNodeId(), this.currTerm, this.state); |
| if (this.state.compareTo(State.STATE_SHUTTING) < 0) { |
| // If it is leader, set the wakeup_a_candidate with true; |
| // If it is follower, call on_stop_following in step_down |
| if (this.state.compareTo(State.STATE_FOLLOWER) <= 0) { |
| stepDown(this.currTerm, this.state == State.STATE_LEADER, |
| new Status(RaftError.ESHUTDOWN, "Raft node is going to quit.")); |
| } |
| this.state = State.STATE_SHUTTING; |
| // Stop all timers |
| timers = stopAllTimers(); |
| if (this.readOnlyService != null) { |
| this.readOnlyService.shutdown(); |
| } |
| if (this.logManager != null) { |
| this.logManager.shutdown(); |
| } |
| if (this.metaStorage != null) { |
| this.metaStorage.shutdown(); |
| } |
| if (this.snapshotExecutor != null) { |
| this.snapshotExecutor.shutdown(); |
| } |
| if (this.wakingCandidate != null) { |
| Replicator.stop(this.wakingCandidate); |
| } |
| if (this.fsmCaller != null) { |
| this.fsmCaller.shutdown(); |
| } |
| if (this.rpcClientService != null) { |
| this.rpcClientService.shutdown(); |
| } |
| if (this.applyQueue != null) { |
| final CountDownLatch latch = new CountDownLatch(1); |
| this.shutdownLatch = latch; |
| |
| Utils.runInThread(this.getOptions().getCommonExecutor(), |
| () -> this.applyQueue.publishEvent((event, sequence) -> { |
| event.groupId = groupId; |
| event.shutdownLatch = latch; |
| })); |
| } |
| if (this.timerManager != null) { |
| this.timerManager.shutdown(); |
| } |
| } |
| |
| if (this.state != State.STATE_SHUTDOWN) { |
| if (done != null) { |
| this.shutdownContinuations.add(done); |
| } |
| return; |
| } |
| |
| // This node is down, it's ok to invoke done right now. Don't invoke this |
| // in place to avoid the dead writeLock issue when done.Run() is going to acquire |
| // a writeLock which is already held by the caller |
| if (done != null) { |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done); |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| |
| // Destroy all timers out of lock |
| if (timers != null) { |
| destroyAllTimers(timers); |
| } |
| } |
| } |
| |
| // Should in lock |
| private List<RepeatedTimer> stopAllTimers() { |
| final List<RepeatedTimer> timers = new ArrayList<>(); |
| if (this.electionTimer != null) { |
| this.electionTimer.stop(); |
| timers.add(this.electionTimer); |
| } |
| if (this.voteTimer != null) { |
| this.voteTimer.stop(); |
| timers.add(this.voteTimer); |
| } |
| if (this.stepDownTimer != null) { |
| this.stepDownTimer.stop(); |
| timers.add(this.stepDownTimer); |
| } |
| if (this.snapshotTimer != null) { |
| this.snapshotTimer.stop(); |
| timers.add(this.snapshotTimer); |
| } |
| return timers; |
| } |
| |
| private void destroyAllTimers(final List<RepeatedTimer> timers) { |
| for (final RepeatedTimer timer : timers) { |
| timer.destroy(); |
| } |
| } |
| |
| @Override |
| public synchronized void join() throws InterruptedException { |
| if (this.shutdownLatch != null) { |
| if (this.readOnlyService != null) { |
| this.readOnlyService.join(); |
| } |
| if (this.logManager != null) { |
| this.logManager.join(); |
| } |
| if (this.snapshotExecutor != null) { |
| this.snapshotExecutor.join(); |
| } |
| if (this.wakingCandidate != null) { |
| Replicator.join(this.wakingCandidate); |
| } |
| this.shutdownLatch.await(); |
| this.applyDisruptor.unsubscribe(groupId); |
| this.shutdownLatch = null; |
| } |
| if (this.fsmCaller != null) { |
| this.fsmCaller.join(); |
| } |
| } |
| |
| private static class StopTransferArg { |
| final NodeImpl node; |
| final long term; |
| final PeerId peer; |
| |
| StopTransferArg(final NodeImpl node, final long term, final PeerId peer) { |
| super(); |
| this.node = node; |
| this.term = term; |
| this.peer = peer; |
| } |
| } |
| |
| private void handleTransferTimeout(final long term, final PeerId peer) { |
| LOG.info("Node {} failed to transfer leadership to peer {}, reached timeout.", getNodeId(), peer); |
| this.writeLock.lock(); |
| try { |
| if (term == this.currTerm) { |
| this.replicatorGroup.stopTransferLeadership(peer); |
| if (this.state == State.STATE_TRANSFERRING) { |
| this.fsmCaller.onLeaderStart(term); |
| this.state = State.STATE_LEADER; |
| this.stopTransferArg = null; |
| } |
| } |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| private void onTransferTimeout(final StopTransferArg arg) { |
| arg.node.handleTransferTimeout(arg.term, arg.peer); |
| } |
| |
| /** |
| * Retrieve current configuration this node seen so far. It's not a reliable way to retrieve cluster peers info, you |
| * should use {@link #listPeers()} instead. |
| * |
| * @return current configuration. |
| */ |
| public Configuration getCurrentConf() { |
| this.readLock.lock(); |
| try { |
| if (this.conf != null && this.conf.getConf() != null) { |
| return this.conf.getConf().copy(); |
| } |
| return null; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<PeerId> listPeers() { |
| this.readLock.lock(); |
| try { |
| if (this.state != State.STATE_LEADER) { |
| throw new IllegalStateException("Not leader"); |
| } |
| return this.conf.getConf().listPeers(); |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<PeerId> listAlivePeers() { |
| this.readLock.lock(); |
| try { |
| if (this.state != State.STATE_LEADER) { |
| throw new IllegalStateException("Not leader"); |
| } |
| return getAliveNodes(this.conf.getConf().getPeers(), Utils.monotonicMs()); |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<PeerId> listLearners() { |
| this.readLock.lock(); |
| try { |
| if (this.state != State.STATE_LEADER) { |
| throw new IllegalStateException("Not leader"); |
| } |
| return this.conf.getConf().listLearners(); |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<PeerId> listAliveLearners() { |
| this.readLock.lock(); |
| try { |
| if (this.state != State.STATE_LEADER) { |
| throw new IllegalStateException("Not leader"); |
| } |
| return getAliveNodes(this.conf.getConf().getLearners(), Utils.monotonicMs()); |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addPeer(final PeerId peer, final Closure done) { |
| Requires.requireNonNull(peer, "Null peer"); |
| this.writeLock.lock(); |
| try { |
| Requires.requireTrue(!this.conf.getConf().contains(peer), "Peer already exists in current configuration"); |
| |
| final Configuration newConf = new Configuration(this.conf.getConf()); |
| newConf.addPeer(peer); |
| unsafeRegisterConfChange(this.conf.getConf(), newConf, done); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void removePeer(final PeerId peer, final Closure done) { |
| Requires.requireNonNull(peer, "Null peer"); |
| this.writeLock.lock(); |
| try { |
| Requires.requireTrue(this.conf.getConf().contains(peer), "Peer not found in current configuration"); |
| |
| final Configuration newConf = new Configuration(this.conf.getConf()); |
| newConf.removePeer(peer); |
| unsafeRegisterConfChange(this.conf.getConf(), newConf, done); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void changePeers(final Configuration newPeers, final Closure done) { |
| Requires.requireNonNull(newPeers, "Null new peers"); |
| Requires.requireTrue(!newPeers.isEmpty(), "Empty new peers"); |
| this.writeLock.lock(); |
| try { |
| LOG.info("Node {} change peers from {} to {}.", getNodeId(), this.conf.getConf(), newPeers); |
| unsafeRegisterConfChange(this.conf.getConf(), newPeers, done); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Status resetPeers(final Configuration newPeers) { |
| Requires.requireNonNull(newPeers, "Null new peers"); |
| Requires.requireTrue(!newPeers.isEmpty(), "Empty new peers"); |
| Requires.requireTrue(newPeers.isValid(), "Invalid new peers: %s", newPeers); |
| this.writeLock.lock(); |
| try { |
| if (newPeers.isEmpty()) { |
| LOG.warn("Node {} set empty peers.", getNodeId()); |
| return new Status(RaftError.EINVAL, "newPeers is empty"); |
| } |
| if (!this.state.isActive()) { |
| LOG.warn("Node {} is in state {}, can't set peers.", getNodeId(), this.state); |
| return new Status(RaftError.EPERM, "Bad state: %s", this.state); |
| } |
| // bootstrap? |
| if (this.conf.getConf().isEmpty()) { |
| LOG.info("Node {} set peers to {} from empty.", getNodeId(), newPeers); |
| this.conf.setConf(newPeers); |
| stepDown(this.currTerm + 1, false, new Status(RaftError.ESETPEER, "Set peer from empty configuration")); |
| return Status.OK(); |
| } |
| if (this.state == State.STATE_LEADER && this.confCtx.isBusy()) { |
| LOG.warn("Node {} set peers need wait current conf changing.", getNodeId()); |
| return new Status(RaftError.EBUSY, "Changing to another configuration"); |
| } |
| // check equal, maybe retry direct return |
| if (this.conf.getConf().equals(newPeers)) { |
| return Status.OK(); |
| } |
| final Configuration newConf = new Configuration(newPeers); |
| LOG.info("Node {} set peers from {} to {}.", getNodeId(), this.conf.getConf(), newPeers); |
| this.conf.setConf(newConf); |
| this.conf.getOldConf().reset(); |
| stepDown(this.currTerm + 1, false, new Status(RaftError.ESETPEER, "Raft node set peer normally")); |
| return Status.OK(); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void addLearners(final List<PeerId> learners, final Closure done) { |
| checkPeers(learners); |
| this.writeLock.lock(); |
| try { |
| final Configuration newConf = new Configuration(this.conf.getConf()); |
| for (final PeerId peer : learners) { |
| newConf.addLearner(peer); |
| } |
| unsafeRegisterConfChange(this.conf.getConf(), newConf, done); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| |
| } |
| |
| private void checkPeers(final List<PeerId> peers) { |
| Requires.requireNonNull(peers, "Null peers"); |
| Requires.requireTrue(!peers.isEmpty(), "Empty peers"); |
| for (final PeerId peer : peers) { |
| Requires.requireNonNull(peer, "Null peer"); |
| } |
| } |
| |
| @Override |
| public void removeLearners(final List<PeerId> learners, final Closure done) { |
| checkPeers(learners); |
| this.writeLock.lock(); |
| try { |
| final Configuration newConf = new Configuration(this.conf.getConf()); |
| for (final PeerId peer : learners) { |
| newConf.removeLearner(peer); |
| } |
| unsafeRegisterConfChange(this.conf.getConf(), newConf, done); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void resetLearners(final List<PeerId> learners, final Closure done) { |
| checkPeers(learners); |
| this.writeLock.lock(); |
| try { |
| final Configuration newConf = new Configuration(this.conf.getConf()); |
| newConf.setLearners(new LinkedHashSet<>(learners)); |
| unsafeRegisterConfChange(this.conf.getConf(), newConf, done); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void snapshot(final Closure done) { |
| doSnapshot(done); |
| } |
| |
| private void doSnapshot(final Closure done) { |
| if (this.snapshotExecutor != null) { |
| this.snapshotExecutor.doSnapshot(done); |
| } |
| else { |
| if (done != null) { |
| final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported"); |
| Utils.runClosureInThread(this.getOptions().getCommonExecutor(), done, status); |
| } |
| } |
| } |
| |
| @Override |
| public void resetElectionTimeoutMs(final int electionTimeoutMs) { |
| Requires.requireTrue(electionTimeoutMs > 0, "Invalid electionTimeoutMs"); |
| this.writeLock.lock(); |
| try { |
| this.options.setElectionTimeoutMs(electionTimeoutMs); |
| this.replicatorGroup.resetHeartbeatInterval(heartbeatTimeout(this.options.getElectionTimeoutMs())); |
| this.replicatorGroup.resetElectionTimeoutInterval(electionTimeoutMs); |
| LOG.info("Node {} reset electionTimeout, currTimer {} state {} new electionTimeout {}.", getNodeId(), |
| this.currTerm, this.state, electionTimeoutMs); |
| this.electionTimer.reset(electionTimeoutMs); |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Status transferLeadershipTo(final PeerId peer) { |
| Requires.requireNonNull(peer, "Null peer"); |
| this.writeLock.lock(); |
| try { |
| if (this.state != State.STATE_LEADER) { |
| LOG.warn("Node {} can't transfer leadership to peer {} as it is in state {}.", getNodeId(), peer, |
| this.state); |
| return new Status(this.state == State.STATE_TRANSFERRING ? RaftError.EBUSY : RaftError.EPERM, |
| "Not a leader"); |
| } |
| if (this.confCtx.isBusy()) { |
| // It's very messy to deal with the case when the |peer| received |
| // TimeoutNowRequest and increase the term while somehow another leader |
| // which was not replicated with the newest configuration has been |
| // elected. If no add_peer with this very |peer| is to be invoked ever |
| // after nor this peer is to be killed, this peer will spin in the voting |
| // procedure and make the each new leader stepped down when the peer |
| // reached vote timeout and it starts to vote (because it will increase |
| // the term of the group) |
| // To make things simple, refuse the operation and force users to |
| // invoke transfer_leadership_to after configuration changing is |
| // completed so that the peer's configuration is up-to-date when it |
| // receives the TimeOutNowRequest. |
| LOG.warn( |
| "Node {} refused to transfer leadership to peer {} when the leader is changing the configuration.", |
| getNodeId(), peer); |
| return new Status(RaftError.EBUSY, "Changing the configuration"); |
| } |
| |
| PeerId peerId = peer.copy(); |
| // if peer_id is ANY_PEER(0.0.0.0:0:0), the peer with the largest |
| // last_log_id will be selected. |
| if (peerId.equals(PeerId.ANY_PEER)) { |
| LOG.info("Node {} starts to transfer leadership to any peer.", getNodeId()); |
| if ((peerId = this.replicatorGroup.findTheNextCandidate(this.conf)) == null) { |
| return new Status(-1, "Candidate not found for any peer"); |
| } |
| } |
| if (peerId.equals(this.serverId)) { |
| LOG.info("Node {} transferred leadership to self.", this.serverId); |
| return Status.OK(); |
| } |
| if (!this.conf.contains(peerId)) { |
| LOG.info("Node {} refused to transfer leadership to peer {} as it is not in {}.", getNodeId(), peer, |
| this.conf); |
| return new Status(RaftError.EINVAL, "Not in current configuration"); |
| } |
| |
| final long lastLogIndex = this.logManager.getLastLogIndex(); |
| if (!this.replicatorGroup.transferLeadershipTo(peerId, lastLogIndex)) { |
| LOG.warn("No such peer {}.", peer); |
| return new Status(RaftError.EINVAL, "No such peer %s", peer); |
| } |
| this.state = State.STATE_TRANSFERRING; |
| final Status status = new Status(RaftError.ETRANSFERLEADERSHIP, |
| "Raft leader is transferring leadership to %s", peerId); |
| onLeaderStop(status); |
| LOG.info("Node {} starts to transfer leadership to peer {}.", getNodeId(), peer); |
| final StopTransferArg stopArg = new StopTransferArg(this, this.currTerm, peerId); |
| this.stopTransferArg = stopArg; |
| this.transferTimer = this.timerManager.schedule(() -> onTransferTimeout(stopArg), |
| this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS); |
| |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| return Status.OK(); |
| } |
| |
| private void onLeaderStop(final Status status) { |
| this.replicatorGroup.clearFailureReplicators(); |
| this.fsmCaller.onLeaderStop(status); |
| } |
| |
| @Override |
| public Message handleTimeoutNowRequest(final TimeoutNowRequest request, final RpcRequestClosure done) { |
| boolean doUnlock = true; |
| this.writeLock.lock(); |
| try { |
| if (request.term() != this.currTerm) { |
| final long savedCurrTerm = this.currTerm; |
| if (request.term() > this.currTerm) { |
| stepDown(request.term(), false, new Status(RaftError.EHIGHERTERMREQUEST, |
| "Raft node receives higher term request")); |
| } |
| LOG.info("Node {} received TimeoutNowRequest from {} while currTerm={} didn't match requestTerm={}.", |
| getNodeId(), request.peerId(), savedCurrTerm, request.term()); |
| return raftOptions.getRaftMessagesFactory() |
| .timeoutNowResponse() |
| .term(this.currTerm) |
| .success(false) |
| .build(); |
| } |
| if (this.state != State.STATE_FOLLOWER) { |
| LOG.info("Node {} received TimeoutNowRequest from {}, while state={}, term={}.", getNodeId(), |
| request.serverId(), this.state, this.currTerm); |
| return raftOptions.getRaftMessagesFactory() |
| .timeoutNowResponse() |
| .term(this.currTerm) |
| .success(false) |
| .build(); |
| } |
| |
| final long savedTerm = this.currTerm; |
| final TimeoutNowResponse resp = raftOptions.getRaftMessagesFactory() |
| .timeoutNowResponse() |
| .term(this.currTerm + 1) // |
| .success(true) // |
| .build(); |
| // Parallelize response and election |
| done.sendResponse(resp); |
| doUnlock = false; |
| electSelf(); |
| LOG.info("Node {} received TimeoutNowRequest from {}, term={}.", getNodeId(), request.serverId(), |
| savedTerm); |
| } |
| finally { |
| if (doUnlock) { |
| this.writeLock.unlock(); |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public Message handleInstallSnapshot(final InstallSnapshotRequest request, final RpcRequestClosure done) { |
| if (this.snapshotExecutor == null) { |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, "Not supported snapshot"); |
| } |
| final PeerId serverId = new PeerId(); |
| if (!serverId.parse(request.serverId())) { |
| LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", getNodeId(), request.serverId()); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Parse serverId failed: %s", request.serverId()); |
| } |
| |
| // Check if a group is started. |
| final PeerId dstPeerId = new PeerId(); |
| if (dstPeerId.parse(request.peerId())) { |
| final String groupId = request.groupId(); |
| final Node node = done.getRpcCtx().getNodeManager().get(groupId, dstPeerId); |
| if (node == null) { |
| return RaftRpcFactory.DEFAULT.newResponse(raftOptions.getRaftMessagesFactory(), RaftError.ENOENT, |
| "Peer id not found: %s, group: %s", request.peerId(), groupId); |
| } |
| } |
| else { |
| return RaftRpcFactory.DEFAULT.newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Fail to parse peerId: %s", request.peerId()); |
| } |
| |
| this.writeLock.lock(); |
| try { |
| if (!this.state.isActive()) { |
| LOG.warn("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", getNodeId(), |
| this.state); |
| return RaftRpcFactory.DEFAULT // |
| .newResponse(raftOptions.getRaftMessagesFactory(), RaftError.EINVAL, |
| "Node %s:%s is not in active state, state %s.", this.groupId, this.serverId, this.state.name()); |
| } |
| |
| if (request.term() < this.currTerm) { |
| LOG.warn("Node {} ignore stale InstallSnapshotRequest from {}, term={}, currTerm={}.", getNodeId(), |
| request.peerId(), request.term(), this.currTerm); |
| return raftOptions.getRaftMessagesFactory() |
| .installSnapshotResponse() |
| .term(this.currTerm) // |
| .success(false) // |
| .build(); |
| } |
| |
| checkStepDown(request.term(), serverId); |
| |
| if (!serverId.equals(this.leaderId)) { |
| LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.", |
| serverId, this.currTerm, this.leaderId); |
| // Increase the term by 1 and make both leaders step down to minimize the |
| // loss of split brain |
| stepDown(request.term() + 1, false, new Status(RaftError.ELEADERCONFLICT, |
| "More than one leader in the same term.")); |
| return raftOptions.getRaftMessagesFactory() |
| .installSnapshotResponse() |
| .term(request.term() + 1) // |
| .success(false) // |
| .build(); |
| } |
| |
| } |
| finally { |
| this.writeLock.unlock(); |
| } |
| final long startMs = Utils.monotonicMs(); |
| try { |
| if (LOG.isInfoEnabled()) { |
| LOG.info( |
| "Node {} received InstallSnapshotRequest from {}, lastIncludedLogIndex={}, lastIncludedLogTerm={}, lastLogId={}.", |
| getNodeId(), request.serverId(), request.meta().lastIncludedIndex(), request.meta() |
| .lastIncludedTerm(), this.logManager.getLastLogId(false)); |
| } |
| this.snapshotExecutor.installSnapshot(request, raftOptions.getRaftMessagesFactory().installSnapshotResponse(), done); |
| return null; |
| } |
| finally { |
| this.metrics.recordLatency("install-snapshot", Utils.monotonicMs() - startMs); |
| } |
| } |
| |
| public void updateConfigurationAfterInstallingSnapshot() { |
| checkAndSetConfiguration(false); |
| } |
| |
| private void stopReplicator(final Collection<PeerId> keep, final Collection<PeerId> drop) { |
| if (drop != null) { |
| for (final PeerId peer : drop) { |
| if (!keep.contains(peer) && !peer.equals(this.serverId)) { |
| this.replicatorGroup.stopReplicator(peer); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public UserLog readCommittedUserLog(final long index) { |
| if (index <= 0) { |
| throw new LogIndexOutOfBoundsException("Request index is invalid: " + index); |
| } |
| |
| final long savedLastAppliedIndex = this.fsmCaller.getLastAppliedIndex(); |
| |
| if (index > savedLastAppliedIndex) { |
| throw new LogIndexOutOfBoundsException("Request index " + index + " is greater than lastAppliedIndex: " |
| + savedLastAppliedIndex); |
| } |
| |
| long curIndex = index; |
| LogEntry entry = this.logManager.getEntry(curIndex); |
| if (entry == null) { |
| throw new LogNotFoundException("User log is deleted at index: " + index); |
| } |
| |
| do { |
| if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_DATA) { |
| return new UserLog(curIndex, entry.getData()); |
| } |
| else { |
| curIndex++; |
| } |
| if (curIndex > savedLastAppliedIndex) { |
| throw new IllegalStateException("No user log between index:" + index + " and last_applied_index:" |
| + savedLastAppliedIndex); |
| } |
| entry = this.logManager.getEntry(curIndex); |
| } |
| while (entry != null); |
| |
| throw new LogNotFoundException("User log is deleted at index: " + curIndex); |
| } |
| |
| @Override |
| public void addReplicatorStateListener(final Replicator.ReplicatorStateListener replicatorStateListener) { |
| Requires.requireNonNull(replicatorStateListener, "replicatorStateListener"); |
| this.replicatorStateListeners.add(replicatorStateListener); |
| } |
| |
| @Override |
| public void removeReplicatorStateListener(final Replicator.ReplicatorStateListener replicatorStateListener) { |
| Requires.requireNonNull(replicatorStateListener, "replicatorStateListener"); |
| this.replicatorStateListeners.remove(replicatorStateListener); |
| } |
| |
| @Override |
| public void clearReplicatorStateListeners() { |
| this.replicatorStateListeners.clear(); |
| } |
| |
| @Override |
| public List<Replicator.ReplicatorStateListener> getReplicatorStateListeners() { |
| return this.replicatorStateListeners; |
| } |
| |
| @Override |
| public int getNodeTargetPriority() { |
| return this.targetPriority; |
| } |
| |
| @Override |
| public void describe(final Printer out) { |
| // node |
| final String _nodeId; |
| final String _state; |
| final String _leaderId; |
| final long _currTerm; |
| final String _conf; |
| final int _targetPriority; |
| this.readLock.lock(); |
| try { |
| _nodeId = String.valueOf(getNodeId()); |
| _state = String.valueOf(this.state); |
| _leaderId = String.valueOf(this.leaderId); |
| _currTerm = this.currTerm; |
| _conf = String.valueOf(this.conf); |
| _targetPriority = this.targetPriority; |
| } |
| finally { |
| this.readLock.unlock(); |
| } |
| out.print("nodeId: ") // |
| .println(_nodeId); |
| out.print("state: ") // |
| .println(_state); |
| out.print("leaderId: ") // |
| .println(_leaderId); |
| out.print("term: ") // |
| .println(_currTerm); |
| out.print("conf: ") // |
| .println(_conf); |
| out.print("targetPriority: ") // |
| .println(_targetPriority); |
| |
| // timers |
| out.println("electionTimer: "); |
| this.electionTimer.describe(out); |
| |
| out.println("voteTimer: "); |
| this.voteTimer.describe(out); |
| |
| out.println("stepDownTimer: "); |
| this.stepDownTimer.describe(out); |
| |
| out.println("snapshotTimer: "); |
| this.snapshotTimer.describe(out); |
| |
| // logManager |
| out.println("logManager: "); |
| this.logManager.describe(out); |
| |
| // fsmCaller |
| out.println("fsmCaller: "); |
| this.fsmCaller.describe(out); |
| |
| // ballotBox |
| out.println("ballotBox: "); |
| this.ballotBox.describe(out); |
| |
| // snapshotExecutor |
| out.println("snapshotExecutor: "); |
| if (this.snapshotExecutor != null) { |
| this.snapshotExecutor.describe(out); |
| } |
| |
| // replicators |
| out.println("replicatorGroup: "); |
| this.replicatorGroup.describe(out); |
| |
| // log storage |
| if (this.logStorage instanceof Describer) { |
| out.println("logStorage: "); |
| ((Describer) this.logStorage).describe(out); |
| } |
| } |
| |
| /** |
| * @return The state. |
| */ |
| public State getState() { |
| return state; |
| } |
| |
| @Override |
| public String toString() { |
| return "JRaftNode [nodeId=" + getNodeId() + "]"; |
| } |
| } |