| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.server.impl; |
| |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.proto.RaftProtos.RaftPeerRole; |
| import org.apache.ratis.protocol.*; |
| import org.apache.ratis.protocol.exceptions.StateMachineException; |
| import org.apache.ratis.server.RaftConfiguration; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.impl.LeaderElection.Phase; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.server.raftlog.LogProtoUtils; |
| import org.apache.ratis.server.raftlog.RaftLog; |
| import org.apache.ratis.server.raftlog.memory.MemoryRaftLog; |
| import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog; |
| import org.apache.ratis.server.storage.*; |
| import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; |
| import org.apache.ratis.proto.RaftProtos.LogEntryProto; |
| import org.apache.ratis.statemachine.SnapshotInfo; |
| import org.apache.ratis.statemachine.StateMachine; |
| import org.apache.ratis.statemachine.TransactionContext; |
| import org.apache.ratis.util.JavaUtils; |
| import org.apache.ratis.util.MemoizedCheckedSupplier; |
| import org.apache.ratis.util.MemoizedSupplier; |
| import org.apache.ratis.util.TimeDuration; |
| import org.apache.ratis.util.Timestamp; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Consumer; |
| import java.util.function.LongSupplier; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.ratis.server.RaftServer.Division.LOG; |
| |
| /** |
| * Common states of a raft peer. Protected by RaftServer's lock. |
| */ |
| class ServerState { |
| private final RaftGroupMemberId memberId; |
| private final RaftServerImpl server; |
| /** Raft log */ |
| private final MemoizedSupplier<RaftLog> log; |
| /** Raft configuration */ |
| private final ConfigurationManager configurationManager; |
| /** The thread that applies committed log entries to the state machine */ |
| private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater; |
| /** local storage for log and snapshot */ |
| private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage; |
| private final SnapshotManager snapshotManager; |
| private final AtomicReference<Timestamp> lastNoLeaderTime; |
| private final TimeDuration noLeaderTimeout; |
| |
| private final ReadRequests readRequests; |
| |
| /** |
| * Latest term server has seen. |
| * Initialized to 0 on first boot, increases monotonically. |
| */ |
| private final AtomicLong currentTerm = new AtomicLong(); |
| /** |
| * The server ID of the leader for this term. Null means either there is |
| * no leader for this term yet or this server does not know who it is yet. |
| */ |
| private final AtomicReference<RaftPeerId> leaderId = new AtomicReference<>(); |
| /** |
| * Candidate that this peer granted vote for in current term (or null if none) |
| */ |
| private volatile RaftPeerId votedFor; |
| |
| /** |
| * Latest installed snapshot for this server. This maybe different than StateMachine's latest |
| * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately. |
| * Further, this will not get updated when SM does snapshots itself. |
| */ |
| private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>(); |
| |
| ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server, |
| RaftStorage.StartupOption option, RaftProperties prop) { |
| this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId()); |
| this.server = server; |
| Collection<RaftPeer> followerPeers = group.getPeers().stream() |
| .filter(peer -> peer.getStartupRole() == RaftPeerRole.FOLLOWER) |
| .collect(Collectors.toList()); |
| Collection<RaftPeer> listenerPeers = group.getPeers().stream() |
| .filter(peer -> peer.getStartupRole() == RaftPeerRole.LISTENER) |
| .collect(Collectors.toList()); |
| final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder() |
| .setConf(followerPeers, listenerPeers) |
| .build(); |
| configurationManager = new ConfigurationManager(id, initialConf); |
| LOG.info("{}: {}", getMemberId(), configurationManager); |
| |
| final String storageDirName = group.getGroupId().getUuid().toString(); |
| this.raftStorage = MemoizedCheckedSupplier.valueOf( |
| () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop)); |
| |
| this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(), |
| stateMachine.getStateMachineStorage()); |
| |
| // On start the leader is null, start the clock now |
| this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime()); |
| this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop); |
| |
| final LongSupplier getSnapshotIndexFromStateMachine = () -> Optional.ofNullable(stateMachine.getLatestSnapshot()) |
| .map(SnapshotInfo::getIndex) |
| .filter(i -> i >= 0) |
| .orElse(RaftLog.INVALID_LOG_INDEX); |
| this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop)); |
| this.readRequests = new ReadRequests(prop, stateMachine); |
| this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater( |
| stateMachine, server, this, getLog().getSnapshotIndex(), prop, |
| this.readRequests.getAppliedIndexConsumer())); |
| } |
| |
| void initialize(StateMachine stateMachine) throws IOException { |
| // initialize raft storage |
| final RaftStorageImpl storage = raftStorage.get(); |
| // read configuration from the storage |
| Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf); |
| |
| stateMachine.initialize(server.getRaftServer(), getMemberId().getGroupId(), storage); |
| |
| // we cannot apply log entries to the state machine in this step, since we |
| // do not know whether the local log entries have been committed. |
| final RaftStorageMetadata metadata = log.get().loadMetadata(); |
| currentTerm.set(metadata.getTerm()); |
| votedFor = metadata.getVotedFor(); |
| } |
| |
| RaftGroupMemberId getMemberId() { |
| return memberId; |
| } |
| |
| void writeRaftConfiguration(LogEntryProto conf) { |
| getStorage().writeRaftConfiguration(conf); |
| } |
| |
| void start() { |
| // initialize stateMachineUpdater |
| stateMachineUpdater.get().start(); |
| } |
| |
| private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) { |
| try { |
| return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf, |
| getSnapshotIndexFromStateMachine, prop); |
| } catch (IOException e) { |
| throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e); |
| } |
| } |
| |
| private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage, |
| Consumer<LogEntryProto> logConsumer, LongSupplier getSnapshotIndexFromStateMachine, |
| RaftProperties prop) throws IOException { |
| final RaftLog log; |
| if (RaftServerConfigKeys.Log.useMemory(prop)) { |
| log = new MemoryRaftLog(memberId, getSnapshotIndexFromStateMachine, prop); |
| } else { |
| log = SegmentedRaftLog.newBuilder() |
| .setMemberId(memberId) |
| .setServer(server) |
| .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry) |
| .setGetTransactionContext(server::getTransactionContext) |
| .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent) |
| .setStorage(storage) |
| .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine) |
| .setProperties(prop) |
| .build(); |
| } |
| log.open(log.getSnapshotIndex(), logConsumer); |
| return log; |
| } |
| |
| RaftConfigurationImpl getRaftConf() { |
| return configurationManager.getCurrent(); |
| } |
| |
| RaftPeer getCurrentPeer() { |
| return configurationManager.getCurrentPeer(); |
| } |
| |
| long getCurrentTerm() { |
| return currentTerm.get(); |
| } |
| |
| boolean updateCurrentTerm(long newTerm) { |
| final long current = currentTerm.getAndUpdate(curTerm -> Math.max(curTerm, newTerm)); |
| if (newTerm > current) { |
| votedFor = null; |
| setLeader(null, "updateCurrentTerm"); |
| return true; |
| } |
| return false; |
| } |
| |
| RaftPeerId getLeaderId() { |
| return leaderId.get(); |
| } |
| |
| /** |
| * Become a candidate and start leader election |
| */ |
| LeaderElection.ConfAndTerm initElection(Phase phase) throws IOException { |
| setLeader(null, phase); |
| final long term; |
| if (phase == Phase.PRE_VOTE) { |
| term = getCurrentTerm(); |
| } else if (phase == Phase.ELECTION) { |
| term = currentTerm.incrementAndGet(); |
| votedFor = getMemberId().getPeerId(); |
| persistMetadata(); |
| } else { |
| throw new IllegalArgumentException("Unexpected phase " + phase); |
| } |
| return new LeaderElection.ConfAndTerm(getRaftConf(), term); |
| } |
| |
| void persistMetadata() throws IOException { |
| getLog().persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor)); |
| } |
| |
| RaftPeerId getVotedFor() { |
| return votedFor; |
| } |
| |
| /** |
| * Vote for a candidate and update the local state. |
| */ |
| void grantVote(RaftPeerId candidateId) { |
| votedFor = candidateId; |
| setLeader(null, "grantVote"); |
| } |
| |
| void setLeader(RaftPeerId newLeaderId, Object op) { |
| final RaftPeerId oldLeaderId = leaderId.getAndSet(newLeaderId); |
| if (!Objects.equals(oldLeaderId, newLeaderId)) { |
| final String suffix; |
| if (newLeaderId == null) { |
| // reset the time stamp when a null leader is assigned |
| lastNoLeaderTime.set(Timestamp.currentTime()); |
| suffix = ""; |
| } else { |
| final Timestamp previous = lastNoLeaderTime.getAndSet(null); |
| suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms"; |
| server.setFirstElection(op); |
| server.getStateMachine().event().notifyLeaderChanged(getMemberId(), newLeaderId); |
| } |
| LOG.info("{}: change Leader from {} to {} at term {} for {}{}", |
| getMemberId(), oldLeaderId, newLeaderId, getCurrentTerm(), op, suffix); |
| if (newLeaderId != null) { |
| server.onGroupLeaderElected(); |
| } |
| } |
| } |
| |
| boolean shouldNotifyExtendedNoLeader() { |
| return Optional.ofNullable(lastNoLeaderTime.get()) |
| .map(Timestamp::elapsedTime) |
| .filter(t -> t.compareTo(noLeaderTimeout) > 0) |
| .isPresent(); |
| } |
| |
| long getLastLeaderElapsedTimeMs() { |
| return Optional.ofNullable(lastNoLeaderTime.get()).map(Timestamp::elapsedTimeMs).orElse(0L); |
| } |
| |
| void becomeLeader() { |
| setLeader(getMemberId().getPeerId(), "becomeLeader"); |
| } |
| |
| StateMachineUpdater getStateMachineUpdater() { |
| if (!stateMachineUpdater.isInitialized()) { |
| throw new IllegalStateException(getMemberId() + ": stateMachineUpdater is uninitialized."); |
| } |
| return stateMachineUpdater.get(); |
| } |
| |
| RaftLog getLog() { |
| if (!log.isInitialized()) { |
| throw new IllegalStateException(getMemberId() + ": log is uninitialized."); |
| } |
| return log.get(); |
| } |
| |
| TermIndex getLastEntry() { |
| TermIndex lastEntry = getLog().getLastEntryTermIndex(); |
| if (lastEntry == null) { |
| // lastEntry may need to be derived from snapshot |
| SnapshotInfo snapshot = getLatestSnapshot(); |
| if (snapshot != null) { |
| lastEntry = snapshot.getTermIndex(); |
| } |
| } |
| |
| return lastEntry; |
| } |
| |
| void appendLog(TransactionContext operation) throws StateMachineException { |
| getLog().append(currentTerm.get(), operation); |
| Objects.requireNonNull(operation.getLogEntryUnsafe(), "transaction-logEntry"); |
| } |
| |
| /** @return true iff the given peer id is recognized as the leader. */ |
| boolean recognizeLeader(Object op, RaftPeerId peerId, long peerTerm) { |
| final long current = currentTerm.get(); |
| if (peerTerm < current) { |
| LOG.warn("{}: Failed to recognize {} as leader for {} since peerTerm = {} < currentTerm = {}", |
| getMemberId(), peerId, op, peerTerm, current); |
| return false; |
| } |
| final RaftPeerId curLeaderId = getLeaderId(); |
| if (peerTerm == current && curLeaderId != null && !curLeaderId.equals(peerId)) { |
| LOG.warn("{}: Failed to recognize {} as leader for {} since current leader is {} (peerTerm = currentTerm = {})", |
| getMemberId(), peerId, op, curLeaderId, current); |
| return false; |
| } |
| return true; |
| } |
| |
| static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) { |
| if (lastEntry == null) { |
| // If the lastEntry of candidate is null, the proto will transfer an empty TermIndexProto, |
| // then term and index of candidateLastEntry will both be 0. |
| // Besides, candidateLastEntry comes from proto now, it never be null. |
| // But we still check candidateLastEntry == null here, |
| // to avoid candidateLastEntry did not come from proto in future. |
| if (candidateLastEntry == null || |
| (candidateLastEntry.getTerm() == 0 && candidateLastEntry.getIndex() == 0)) { |
| return 0; |
| } |
| return -1; |
| } else if (candidateLastEntry == null) { |
| return 1; |
| } |
| |
| return lastEntry.compareTo(candidateLastEntry); |
| } |
| |
| @Override |
| public String toString() { |
| return getMemberId() + ":t" + currentTerm + ", leader=" + getLeaderId() |
| + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf(); |
| } |
| |
| boolean isConfCommitted() { |
| return getLog().getLastCommittedIndex() >= getRaftConf().getLogEntryIndex(); |
| } |
| |
| void setRaftConf(LogEntryProto entry) { |
| if (entry.hasConfigurationEntry()) { |
| setRaftConf(LogProtoUtils.toRaftConfiguration(entry)); |
| } |
| } |
| |
| void setRaftConf(RaftConfiguration conf) { |
| configurationManager.addConfiguration(conf); |
| server.getServerRpc().addRaftPeers(conf.getAllPeers()); |
| final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER); |
| if (!listeners.isEmpty()) { |
| server.getServerRpc().addRaftPeers(listeners); |
| } |
| LOG.info("{}: set configuration {}", getMemberId(), conf); |
| LOG.trace("{}: {}", getMemberId(), configurationManager); |
| } |
| |
| void updateConfiguration(List<LogEntryProto> entries) { |
| if (entries != null && !entries.isEmpty()) { |
| configurationManager.removeConfigurations(entries.get(0).getIndex()); |
| entries.forEach(this::setRaftConf); |
| } |
| } |
| |
| boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) { |
| if (getLog().updateCommitIndex(majorityIndex, curTerm, isLeader)) { |
| getStateMachineUpdater().notifyUpdater(); |
| return true; |
| } |
| return false; |
| } |
| |
| void notifyStateMachineUpdater() { |
| getStateMachineUpdater().notifyUpdater(); |
| } |
| |
| void reloadStateMachine(TermIndex snapshotTermIndex) { |
| getStateMachineUpdater().reloadStateMachine(); |
| |
| getLog().onSnapshotInstalled(snapshotTermIndex.getIndex()); |
| latestInstalledSnapshot.set(snapshotTermIndex); |
| } |
| |
| void close() { |
| try { |
| if (stateMachineUpdater.isInitialized()) { |
| getStateMachineUpdater().stopAndJoin(); |
| } |
| } catch (Throwable e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e); |
| } |
| |
| try { |
| if (log.isInitialized()) { |
| getLog().close(); |
| } |
| } catch (Throwable e) { |
| LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e); |
| } |
| |
| try { |
| if (raftStorage.isInitialized()) { |
| getStorage().close(); |
| } |
| } catch (Throwable e) { |
| LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e); |
| } |
| } |
| |
| RaftStorageImpl getStorage() { |
| if (!raftStorage.isInitialized()) { |
| throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized."); |
| } |
| return raftStorage.getUnchecked(); |
| } |
| |
| void installSnapshot(InstallSnapshotRequestProto request) throws IOException { |
| // TODO: verify that we need to install the snapshot |
| StateMachine sm = server.getStateMachine(); |
| sm.pause(); // pause the SM to prepare for install snapshot |
| snapshotManager.installSnapshot(request, sm); |
| } |
| |
| private SnapshotInfo getLatestSnapshot() { |
| return server.getStateMachine().getLatestSnapshot(); |
| } |
| |
| long getLatestInstalledSnapshotIndex() { |
| final TermIndex ti = latestInstalledSnapshot.get(); |
| return ti != null? ti.getIndex(): RaftLog.INVALID_LOG_INDEX; |
| } |
| |
| /** |
| * The last index included in either the latestSnapshot or the latestInstalledSnapshot |
| * @return the last snapshot index |
| */ |
| long getSnapshotIndex() { |
| final SnapshotInfo s = getLatestSnapshot(); |
| final long latestSnapshotIndex = s != null ? s.getIndex() : RaftLog.INVALID_LOG_INDEX; |
| return Math.max(latestSnapshotIndex, getLatestInstalledSnapshotIndex()); |
| } |
| |
| long getNextIndex() { |
| final long logNextIndex = getLog().getNextIndex(); |
| final long snapshotNextIndex = getLog().getSnapshotIndex() + 1; |
| return Math.max(logNextIndex, snapshotNextIndex); |
| } |
| |
| long getLastAppliedIndex() { |
| return getStateMachineUpdater().getStateMachineLastAppliedIndex(); |
| } |
| |
| boolean containsTermIndex(TermIndex ti) { |
| Objects.requireNonNull(ti, "ti == null"); |
| |
| if (Optional.ofNullable(latestInstalledSnapshot.get()).filter(ti::equals).isPresent()) { |
| return true; |
| } |
| if (Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent()) { |
| return true; |
| } |
| return getLog().contains(ti); |
| } |
| |
| ReadRequests getReadRequests() { |
| return readRequests; |
| } |
| } |