blob: af241a6331b6e9753a60d008fa56db8a064b53e6 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedCheckedSupplier;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static org.apache.ratis.server.RaftServer.Division.LOG;
* Common states of a raft peer. Protected by RaftServer's lock.
class ServerState {
private final RaftGroupMemberId memberId;
private final RaftServerImpl server;
/** Raft log */
private final MemoizedSupplier<RaftLog> log;
/** Raft configuration */
private final ConfigurationManager configurationManager;
/** The thread that applies committed log entries to the state machine */
private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
/** local storage for log and snapshot */
private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage;
private final SnapshotManager snapshotManager;
private final AtomicReference<Timestamp> lastNoLeaderTime;
private final TimeDuration noLeaderTimeout;
private final ReadRequests readRequests;
* Latest term server has seen.
* Initialized to 0 on first boot, increases monotonically.
private final AtomicLong currentTerm = new AtomicLong();
* The server ID of the leader for this term. Null means either there is
* no leader for this term yet or this server does not know who it is yet.
private final AtomicReference<RaftPeerId> leaderId = new AtomicReference<>();
* Candidate that this peer granted vote for in current term (or null if none)
private volatile RaftPeerId votedFor;
* Latest installed snapshot for this server. This maybe different than StateMachine's latest
* snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately.
* Further, this will not get updated when SM does snapshots itself.
private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>();
ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server,
RaftStorage.StartupOption option, RaftProperties prop) {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
Collection<RaftPeer> followerPeers = group.getPeers().stream()
.filter(peer -> peer.getStartupRole() == RaftPeerRole.FOLLOWER)
Collection<RaftPeer> listenerPeers = group.getPeers().stream()
.filter(peer -> peer.getStartupRole() == RaftPeerRole.LISTENER)
final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder()
.setConf(followerPeers, listenerPeers)
configurationManager = new ConfigurationManager(id, initialConf);"{}: {}", getMemberId(), configurationManager);
final String storageDirName = group.getGroupId().getUuid().toString();
this.raftStorage = MemoizedCheckedSupplier.valueOf(
() -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));
this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(),
// On start the leader is null, start the clock now
this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime());
this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
final LongSupplier getSnapshotIndexFromStateMachine = () -> Optional.ofNullable(stateMachine.getLatestSnapshot())
.filter(i -> i >= 0)
this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop));
this.readRequests = new ReadRequests(prop, stateMachine);
this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
stateMachine, server, this, getLog().getSnapshotIndex(), prop,
void initialize(StateMachine stateMachine) throws IOException {
// initialize raft storage
final RaftStorageImpl storage = raftStorage.get();
// read configuration from the storage
stateMachine.initialize(server.getRaftServer(), getMemberId().getGroupId(), storage);
// we cannot apply log entries to the state machine in this step, since we
// do not know whether the local log entries have been committed.
final RaftStorageMetadata metadata = log.get().loadMetadata();
votedFor = metadata.getVotedFor();
RaftGroupMemberId getMemberId() {
return memberId;
void writeRaftConfiguration(LogEntryProto conf) {
void start() {
// initialize stateMachineUpdater
private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) {
try {
return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf,
getSnapshotIndexFromStateMachine, prop);
} catch (IOException e) {
throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e);
private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage,
Consumer<LogEntryProto> logConsumer, LongSupplier getSnapshotIndexFromStateMachine,
RaftProperties prop) throws IOException {
final RaftLog log;
if (RaftServerConfigKeys.Log.useMemory(prop)) {
log = new MemoryRaftLog(memberId, getSnapshotIndexFromStateMachine, prop);
} else {
log = SegmentedRaftLog.newBuilder()
}, logConsumer);
return log;
RaftConfigurationImpl getRaftConf() {
return configurationManager.getCurrent();
RaftPeer getCurrentPeer() {
return configurationManager.getCurrentPeer();
long getCurrentTerm() {
return currentTerm.get();
boolean updateCurrentTerm(long newTerm) {
final long current = currentTerm.getAndUpdate(curTerm -> Math.max(curTerm, newTerm));
if (newTerm > current) {
votedFor = null;
setLeader(null, "updateCurrentTerm");
return true;
return false;
RaftPeerId getLeaderId() {
return leaderId.get();
* Become a candidate and start leader election
LeaderElection.ConfAndTerm initElection(Phase phase) throws IOException {
setLeader(null, phase);
final long term;
if (phase == Phase.PRE_VOTE) {
term = getCurrentTerm();
} else if (phase == Phase.ELECTION) {
term = currentTerm.incrementAndGet();
votedFor = getMemberId().getPeerId();
} else {
throw new IllegalArgumentException("Unexpected phase " + phase);
return new LeaderElection.ConfAndTerm(getRaftConf(), term);
void persistMetadata() throws IOException {
getLog().persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
RaftPeerId getVotedFor() {
return votedFor;
* Vote for a candidate and update the local state.
void grantVote(RaftPeerId candidateId) {
votedFor = candidateId;
setLeader(null, "grantVote");
void setLeader(RaftPeerId newLeaderId, Object op) {
final RaftPeerId oldLeaderId = leaderId.getAndSet(newLeaderId);
if (!Objects.equals(oldLeaderId, newLeaderId)) {
final String suffix;
if (newLeaderId == null) {
// reset the time stamp when a null leader is assigned
suffix = "";
} else {
final Timestamp previous = lastNoLeaderTime.getAndSet(null);
suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
server.getStateMachine().event().notifyLeaderChanged(getMemberId(), newLeaderId);
}"{}: change Leader from {} to {} at term {} for {}{}",
getMemberId(), oldLeaderId, newLeaderId, getCurrentTerm(), op, suffix);
if (newLeaderId != null) {
boolean shouldNotifyExtendedNoLeader() {
return Optional.ofNullable(lastNoLeaderTime.get())
.filter(t -> t.compareTo(noLeaderTimeout) > 0)
long getLastLeaderElapsedTimeMs() {
return Optional.ofNullable(lastNoLeaderTime.get()).map(Timestamp::elapsedTimeMs).orElse(0L);
void becomeLeader() {
setLeader(getMemberId().getPeerId(), "becomeLeader");
StateMachineUpdater getStateMachineUpdater() {
if (!stateMachineUpdater.isInitialized()) {
throw new IllegalStateException(getMemberId() + ": stateMachineUpdater is uninitialized.");
return stateMachineUpdater.get();
RaftLog getLog() {
if (!log.isInitialized()) {
throw new IllegalStateException(getMemberId() + ": log is uninitialized.");
return log.get();
TermIndex getLastEntry() {
TermIndex lastEntry = getLog().getLastEntryTermIndex();
if (lastEntry == null) {
// lastEntry may need to be derived from snapshot
SnapshotInfo snapshot = getLatestSnapshot();
if (snapshot != null) {
lastEntry = snapshot.getTermIndex();
return lastEntry;
void appendLog(TransactionContext operation) throws StateMachineException {
getLog().append(currentTerm.get(), operation);
Objects.requireNonNull(operation.getLogEntryUnsafe(), "transaction-logEntry");
/** @return true iff the given peer id is recognized as the leader. */
boolean recognizeLeader(Object op, RaftPeerId peerId, long peerTerm) {
final long current = currentTerm.get();
if (peerTerm < current) {
LOG.warn("{}: Failed to recognize {} as leader for {} since peerTerm = {} < currentTerm = {}",
getMemberId(), peerId, op, peerTerm, current);
return false;
final RaftPeerId curLeaderId = getLeaderId();
if (peerTerm == current && curLeaderId != null && !curLeaderId.equals(peerId)) {
LOG.warn("{}: Failed to recognize {} as leader for {} since current leader is {} (peerTerm = currentTerm = {})",
getMemberId(), peerId, op, curLeaderId, current);
return false;
return true;
static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
if (lastEntry == null) {
// If the lastEntry of candidate is null, the proto will transfer an empty TermIndexProto,
// then term and index of candidateLastEntry will both be 0.
// Besides, candidateLastEntry comes from proto now, it never be null.
// But we still check candidateLastEntry == null here,
// to avoid candidateLastEntry did not come from proto in future.
if (candidateLastEntry == null ||
(candidateLastEntry.getTerm() == 0 && candidateLastEntry.getIndex() == 0)) {
return 0;
return -1;
} else if (candidateLastEntry == null) {
return 1;
return lastEntry.compareTo(candidateLastEntry);
public String toString() {
return getMemberId() + ":t" + currentTerm + ", leader=" + getLeaderId()
+ ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf();
boolean isConfCommitted() {
return getLog().getLastCommittedIndex() >= getRaftConf().getLogEntryIndex();
void setRaftConf(LogEntryProto entry) {
if (entry.hasConfigurationEntry()) {
void setRaftConf(RaftConfiguration conf) {
final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER);
if (!listeners.isEmpty()) {
}"{}: set configuration {}", getMemberId(), conf);
LOG.trace("{}: {}", getMemberId(), configurationManager);
void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) {
if (getLog().updateCommitIndex(majorityIndex, curTerm, isLeader)) {
return true;
return false;
void notifyStateMachineUpdater() {
void reloadStateMachine(TermIndex snapshotTermIndex) {
void close() {
try {
if (stateMachineUpdater.isInitialized()) {
} catch (Throwable e) {
if (e instanceof InterruptedException) {
LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e);
try {
if (log.isInitialized()) {
} catch (Throwable e) {
LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e);
try {
if (raftStorage.isInitialized()) {
} catch (Throwable e) {
LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e);
RaftStorageImpl getStorage() {
if (!raftStorage.isInitialized()) {
throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized.");
return raftStorage.getUnchecked();
void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
// TODO: verify that we need to install the snapshot
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
snapshotManager.installSnapshot(request, sm);
private SnapshotInfo getLatestSnapshot() {
return server.getStateMachine().getLatestSnapshot();
long getLatestInstalledSnapshotIndex() {
final TermIndex ti = latestInstalledSnapshot.get();
return ti != null? ti.getIndex(): RaftLog.INVALID_LOG_INDEX;
* The last index included in either the latestSnapshot or the latestInstalledSnapshot
* @return the last snapshot index
long getSnapshotIndex() {
final SnapshotInfo s = getLatestSnapshot();
final long latestSnapshotIndex = s != null ? s.getIndex() : RaftLog.INVALID_LOG_INDEX;
return Math.max(latestSnapshotIndex, getLatestInstalledSnapshotIndex());
long getNextIndex() {
final long logNextIndex = getLog().getNextIndex();
final long snapshotNextIndex = getLog().getSnapshotIndex() + 1;
return Math.max(logNextIndex, snapshotNextIndex);
long getLastAppliedIndex() {
return getStateMachineUpdater().getStateMachineLastAppliedIndex();
boolean containsTermIndex(TermIndex ti) {
Objects.requireNonNull(ti, "ti == null");
if (Optional.ofNullable(latestInstalledSnapshot.get()).filter(ti::equals).isPresent()) {
return true;
if (Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent()) {
return true;
return getLog().contains(ti);
ReadRequests getReadRequests() {
return readRequests;