blob: 05cda84bf3a791e48ec517f80c95cf374dee137e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.*;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
/**
* Common states of a raft peer. Protected by RaftServer's lock.
*/
public class ServerState implements Closeable {
private final RaftGroupMemberId memberId;
private final RaftServerImpl server;
/** Raft log */
private final RaftLog log;
/** Raft configuration */
private final ConfigurationManager configurationManager;
/** The thread that applies committed log entries to the state machine */
private final StateMachineUpdater stateMachineUpdater;
/** local storage for log and snapshot */
private final RaftStorage storage;
private final SnapshotManager snapshotManager;
private volatile Timestamp lastNoLeaderTime;
private final TimeDuration noLeaderTimeout;
/**
* Latest term server has seen.
* Initialized to 0 on first boot, increases monotonically.
*
* @see TermIndex#isValidTerm(int)
*/
private final AtomicLong currentTerm = new AtomicLong();
/**
* The server ID of the leader for this term. Null means either there is
* no leader for this term yet or this server does not know who it is yet.
*/
private volatile RaftPeerId leaderId;
/**
* Candidate that this peer granted vote for in current term (or null if none)
*/
private volatile RaftPeerId votedFor;
/**
* Latest installed snapshot for this server. This maybe different than StateMachine's latest
* snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately.
* Further, this will not get updated when SM does snapshots itself.
*/
private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>();
ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
RaftServerImpl server, StateMachine stateMachine)
throws IOException {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
RaftConfiguration initialConf = RaftConfiguration.newBuilder()
.setConf(group.getPeers()).build();
configurationManager = new ConfigurationManager(initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
// use full uuid string to create a subdirectory
final File dir = chooseStorageDir(RaftServerConfigKeys.storageDirs(prop),
group.getGroupId().getUuid().toString());
storage = new RaftStorage(dir, RaftServerConstants.StartupOption.REGULAR);
snapshotManager = new SnapshotManager(storage, id);
long lastApplied = initStatemachine(stateMachine, group.getGroupId());
// On start the leader is null, start the clock now
leaderId = null;
this.lastNoLeaderTime = Timestamp.currentTime();
this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
// we cannot apply log entries to the state machine in this step, since we
// do not know whether the local log entries have been committed.
this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, lastApplied, prop);
RaftLog.Metadata metadata = log.loadMetadata();
currentTerm.set(metadata.getTerm());
votedFor = metadata.getVotedFor();
stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this, lastApplied, prop);
}
RaftGroupMemberId getMemberId() {
return memberId;
}
static File chooseStorageDir(List<File> volumes, String targetSubDir) throws IOException {
final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
final File[] empty = {};
final List<File> resultList = new ArrayList<>();
volumes.stream().flatMap(volume -> {
final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty);
numberOfStorageDirPerVolume.put(volume, dirs.length);
return Arrays.stream(dirs);
}).filter(dir -> targetSubDir.equals(dir.getName()))
.forEach(resultList::add);
if (resultList.size() > 1) {
throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList);
}
if (resultList.size() == 1) {
return resultList.get(0);
}
return numberOfStorageDirPerVolume.entrySet().stream()
.min(Comparator.comparing(Map.Entry::getValue))
.map(Map.Entry::getKey)
.map(v -> new File(v, targetSubDir))
.orElseThrow(() -> new IOException("No storage directory found."));
}
private long initStatemachine(StateMachine sm, RaftGroupId gid)
throws IOException {
sm.initialize(server.getProxy(), gid, storage);
SnapshotInfo snapshot = sm.getLatestSnapshot();
if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
return RaftLog.INVALID_LOG_INDEX;
}
// get the raft configuration from raft metafile
RaftConfiguration raftConf = storage.readRaftConfiguration();
if (raftConf != null) {
setRaftConf(raftConf.getLogEntryIndex(), raftConf);
}
return snapshot.getIndex();
}
void writeRaftConfiguration(LogEntryProto conf) {
storage.writeRaftConfiguration(conf);
}
void start() {
stateMachineUpdater.start();
}
private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage,
Consumer<LogEntryProto> logConsumer, long lastIndexInSnapshot, RaftProperties prop) throws IOException {
final RaftLog log;
if (RaftServerConfigKeys.Log.useMemory(prop)) {
log = new MemoryRaftLog(memberId, lastIndexInSnapshot, prop);
} else {
log = new SegmentedRaftLog(memberId, server, storage, lastIndexInSnapshot, prop);
}
log.open(lastIndexInSnapshot, logConsumer);
return log;
}
RaftConfiguration getRaftConf() {
return configurationManager.getCurrent();
}
public long getCurrentTerm() {
return currentTerm.get();
}
boolean updateCurrentTerm(long newTerm) {
final long current = currentTerm.getAndUpdate(curTerm -> Math.max(curTerm, newTerm));
if (newTerm > current) {
votedFor = null;
setLeader(null, "updateCurrentTerm");
return true;
}
return false;
}
RaftPeerId getLeaderId() {
return leaderId;
}
boolean hasLeader() {
return leaderId != null;
}
/**
* Become a candidate and start leader election
*/
long initElection() {
votedFor = getMemberId().getPeerId();
setLeader(null, "initElection");
return currentTerm.incrementAndGet();
}
void persistMetadata() throws IOException {
this.log.writeMetadata(currentTerm.get(), votedFor);
}
/**
* Vote for a candidate and update the local state.
*/
void grantVote(RaftPeerId candidateId) {
votedFor = candidateId;
setLeader(null, "grantVote");
}
void setLeader(RaftPeerId newLeaderId, String op) {
if (!Objects.equals(leaderId, newLeaderId)) {
String suffix;
if (newLeaderId == null) {
// reset the time stamp when a null leader is assigned
lastNoLeaderTime = Timestamp.currentTime();
suffix = "";
} else {
Timestamp previous = lastNoLeaderTime;
lastNoLeaderTime = null;
suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
}
LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
leaderId = newLeaderId;
}
}
boolean shouldNotifyExtendedNoLeader() {
return Optional.ofNullable(lastNoLeaderTime)
.map(Timestamp::elapsedTime)
.filter(t -> t.compareTo(noLeaderTimeout) > 0)
.isPresent();
}
long getLastLeaderElapsedTimeMs() {
final Timestamp t = lastNoLeaderTime;
return t == null ? 0 : t.elapsedTimeMs();
}
void becomeLeader() {
setLeader(getMemberId().getPeerId(), "becomeLeader");
}
public RaftLog getLog() {
return log;
}
void appendLog(TransactionContext operation) throws StateMachineException {
log.append(currentTerm.get(), operation);
Objects.requireNonNull(operation.getLogEntry());
}
/**
* Check if accept the leader selfId and term from the incoming AppendEntries rpc.
* If accept, update the current state.
* @return true if the check passes
*/
boolean recognizeLeader(RaftPeerId leaderId, long leaderTerm) {
final long current = currentTerm.get();
if (leaderTerm < current) {
return false;
} else if (leaderTerm > current || this.leaderId == null) {
// If the request indicates a term that is greater than the current term
// or no leader has been set for the current term, make sure to update
// leader and term later
return true;
}
return this.leaderId.equals(leaderId);
}
/**
* Check if the candidate's term is acceptable
*/
boolean recognizeCandidate(RaftPeerId candidateId, long candidateTerm) {
if (!getRaftConf().containsInConf(candidateId)) {
return false;
}
final long current = currentTerm.get();
if (candidateTerm > current) {
return true;
} else if (candidateTerm == current) {
// has not voted yet or this is a retry
return votedFor == null || votedFor.equals(candidateId);
}
return false;
}
boolean isLogUpToDate(TermIndex candidateLastEntry) {
TermIndex local = log.getLastEntryTermIndex();
// need to take into account snapshot
SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
if (local == null && snapshot == null) {
return true;
} else if (candidateLastEntry == null) {
return false;
}
if (local == null || (snapshot != null && snapshot.getIndex() > local.getIndex())) {
local = snapshot.getTermIndex();
}
return local.compareTo(candidateLastEntry) <= 0;
}
@Override
public String toString() {
return getMemberId() + ":t" + currentTerm + ", leader=" + leaderId
+ ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf();
}
boolean isConfCommitted() {
return getLog().getLastCommittedIndex() >=
getRaftConf().getLogEntryIndex();
}
void setRaftConf(LogEntryProto entry) {
if (entry.hasConfigurationEntry()) {
setRaftConf(entry.getIndex(), ServerProtoUtils.toRaftConfiguration(entry));
}
}
void setRaftConf(long logIndex, RaftConfiguration conf) {
configurationManager.addConfiguration(logIndex, conf);
server.getServerRpc().addPeers(conf.getPeers());
LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
LOG.trace("{}: {}", getMemberId(), configurationManager);
}
void updateConfiguration(LogEntryProto[] entries) {
if (entries != null && entries.length > 0) {
configurationManager.removeConfigurations(entries[0].getIndex());
Arrays.stream(entries).forEach(this::setRaftConf);
}
}
boolean updateStatemachine(long majorityIndex, long currentTerm) {
if (log.updateLastCommitted(majorityIndex, currentTerm)) {
stateMachineUpdater.notifyUpdater();
return true;
}
return false;
}
void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) {
log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
stateMachineUpdater.reloadStateMachine();
}
@Override
public void close() throws IOException {
try {
stateMachineUpdater.stopAndJoin();
} catch (InterruptedException e) {
LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e);
}
LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex());
log.close();
storage.close();
}
public RaftStorage getStorage() {
return storage;
}
void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
// TODO: verify that we need to install the snapshot
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
snapshotManager.installSnapshot(sm, request);
updateInstalledSnapshotIndex(ServerProtoUtils.toTermIndex(request.getSnapshotChunk().getTermIndex()));
}
void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
latestInstalledSnapshot.set(lastTermIndexInSnapshot);
}
SnapshotInfo getLatestSnapshot() {
return server.getStateMachine().getLatestSnapshot();
}
public long getLatestInstalledSnapshotIndex() {
final TermIndex ti = latestInstalledSnapshot.get();
return ti != null? ti.getIndex(): 0L;
}
/**
* The last index included in either the latestSnapshot or the latestInstalledSnapshot
* @return the last snapshot index
*/
long getSnapshotIndex() {
final SnapshotInfo s = getLatestSnapshot();
final long latestSnapshotIndex = s != null ? s.getIndex() : 0;
return Math.max(latestSnapshotIndex, getLatestInstalledSnapshotIndex());
}
public long getNextIndex() {
final long logNextIndex = log.getNextIndex();
final long snapshotNextIndex = getSnapshotIndex() + 1;
return Math.max(logNextIndex, snapshotNextIndex);
}
public long getLastAppliedIndex() {
return stateMachineUpdater.getLastAppliedIndex();
}
boolean containsTermIndex(TermIndex ti) {
Objects.requireNonNull(ti, "ti == null");
if (Optional.ofNullable(latestInstalledSnapshot.get()).filter(ti::equals).isPresent()) {
return true;
}
if (Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent()) {
return true;
}
return log.contains(ti);
}
}