blob: 13a05c509fc39c7f8ab14a6e94cf878e24307aca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
/**
* Base class of RaftLog. Currently we provide two types of RaftLog
* implementation:
* 1. MemoryRaftLog: all the log entries are stored in memory. This is only used
* for testing.
* 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
* in segments.
*/
public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync";
private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getName(), s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getName(), s);
/** The least valid log index, i.e. the index used when writing to an empty log. */
public static final long LEAST_VALID_LOG_INDEX = 0L;
public static final long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
private final String name;
/**
* The largest committed index. Note the last committed log may be included
* in the latest snapshot file.
*/
private final RaftLogIndex commitIndex;
private final RaftLogIndex purgeIndex;
private final int purgeGap;
private final RaftGroupMemberId memberId;
private final int maxBufferSize;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Runner runner = new Runner(this::getName);
private final OpenCloseState state;
private volatile LogEntryProto lastMetadataEntry = null;
protected RaftLog(RaftGroupMemberId memberId, long commitIndex, RaftProperties properties) {
this.name = memberId + "-" + getClass().getSimpleName();
this.memberId = memberId;
this.commitIndex = new RaftLogIndex("commitIndex", commitIndex);
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
this.state = new OpenCloseState(getName());
}
public long getLastCommittedIndex() {
return commitIndex.get();
}
public void checkLogState() {
state.assertOpen();
}
public boolean isOpened() {
return state.isOpened();
}
/**
* Update the last committed index.
* @param majorityIndex the index that has achieved majority.
* @param currentTerm the current term.
* @return true if update is applied; otherwise, return false, i.e. no update required.
*/
public boolean updateLastCommitted(long majorityIndex, long currentTerm) {
try(AutoCloseableLock writeLock = writeLock()) {
final long oldCommittedIndex = getLastCommittedIndex();
if (oldCommittedIndex < majorityIndex) {
// Only update last committed index for current term. See ยง5.4.2 in
// paper for details.
final TermIndex entry = getTermIndex(majorityIndex);
if (entry != null && entry.getTerm() == currentTerm) {
final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
if (newCommitIndex > oldCommittedIndex) {
commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
}
return true;
}
}
}
return false;
}
/**
* Does the log contains the given term and index? Used to check the
* consistency between the local log of a follower and the log entries sent
* by the leader.
*/
public boolean contains(TermIndex ti) {
Objects.requireNonNull(ti, "ti == null");
return ti.equals(getTermIndex(ti.getIndex()));
}
/**
* @return the index of the next log entry to append.
*/
public long getNextIndex() {
final TermIndex last = getLastEntryTermIndex();
if (last == null) {
// if the log is empty, the last committed index should be consistent with
// the last index included in the latest snapshot.
return getLastCommittedIndex() + 1;
}
return last.getIndex() + 1;
}
@Override
public final long append(long term, TransactionContext transaction) throws StateMachineException {
return runner.runSequentially(() -> appendImpl(term, transaction));
}
private long appendImpl(long term, TransactionContext operation) throws StateMachineException {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
final long nextIndex = getNextIndex();
// This is called here to guarantee strict serialization of callback executions in case
// the SM wants to attach a logic depending on ordered execution in the log commit order.
try {
operation = operation.preAppendTransaction();
} catch (IOException e) {
throw new StateMachineException(memberId, e);
}
// build the log entry after calling the StateMachine
final LogEntryProto e = operation.initLogEntry(term, nextIndex);
int entrySize = e.getSerializedSize();
if (entrySize > maxBufferSize) {
throw new StateMachineException(memberId, new RaftLogIOException(
"Log entry size " + entrySize + " exceeds the max buffer limit of " + maxBufferSize));
}
appendEntry(e);
return nextIndex;
}
}
@Override
public final long appendMetadata(long term, long newCommitIndex) {
return runner.runSequentially(() -> appendMetadataImpl(term, newCommitIndex));
}
private long appendMetadataImpl(long term, long newCommitIndex) {
checkLogState();
if (!shouldAppendMetadata(newCommitIndex)) {
return INVALID_LOG_INDEX;
}
final LogEntryProto entry;
final long nextIndex;
try(AutoCloseableLock writeLock = writeLock()) {
nextIndex = getNextIndex();
entry = ServerProtoUtils.toLogEntryProto(newCommitIndex, term, nextIndex);
appendEntry(entry);
}
lastMetadataEntry = entry;
return nextIndex;
}
private boolean shouldAppendMetadata(long newCommitIndex) {
if (newCommitIndex <= 0) {
// do not log the first conf entry
return false;
} else if (Optional.ofNullable(lastMetadataEntry)
.filter(e -> e.getIndex() == newCommitIndex || e.getMetadataEntry().getCommitIndex() >= newCommitIndex)
.isPresent()) {
//log neither lastMetadataEntry, nor entries with a smaller commit index.
return false;
}
try {
if (get(newCommitIndex).hasMetadataEntry()) {
// do not log the metadata entry
return false;
}
} catch(RaftLogIOException e) {
LOG.error("Failed to get log entry for index " + newCommitIndex, e);
}
return true;
}
@Override
public final long append(long term, RaftConfiguration configuration) {
return runner.runSequentially(() -> appendImpl(term, configuration));
}
private long appendImpl(long term, RaftConfiguration newConf) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
final long nextIndex = getNextIndex();
final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
nextIndex);
appendEntry(e);
return nextIndex;
}
}
public final void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
openImpl(lastIndexInSnapshot, e -> {
if (e.hasMetadataEntry()) {
lastMetadataEntry = e;
} else if (consumer != null) {
consumer.accept(e);
}
});
Optional.ofNullable(lastMetadataEntry).ifPresent(
e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
state.open();
final long startIndex = getStartIndex();
if (startIndex > LEAST_VALID_LOG_INDEX) {
purgeIndex.updateIncreasingly(startIndex - 1, infoIndexChange);
}
}
protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
}
public abstract long getStartIndex();
/**
* Get the log entry of the given index.
*
* @param index The given index.
* @return The log entry associated with the given index.
* Null if there is no log entry with the index.
*/
public abstract LogEntryProto get(long index) throws RaftLogIOException;
/**
* Get the log entry of the given index along with the state machine data.
*
* @param index The given index.
* @return The log entry associated with the given index.
* Null if there is no log entry with the index.
*/
public abstract EntryWithData getEntryWithData(long index) throws RaftLogIOException;
/**
* Get the TermIndex information of the given index.
*
* @param index The given index.
* @return The TermIndex of the log entry associated with the given index.
* Null if there is no log entry with the index.
*/
public abstract TermIndex getTermIndex(long index);
/**
* @param startIndex the starting log index (inclusive)
* @param endIndex the ending log index (exclusive)
* @return TermIndex of all log entries within the given index range. Null if
* startIndex is greater than the smallest available index.
*/
public abstract TermIndex[] getEntries(long startIndex, long endIndex);
/**
* @return the last log entry's term and index.
*/
public abstract TermIndex getLastEntryTermIndex();
/**
* Validate the term and index of entry w.r.t RaftLog
*/
protected void validateLogEntry(LogEntryProto entry) {
if (entry.hasMetadataEntry()) {
return;
}
TermIndex lastTermIndex = getLastEntryTermIndex();
if (lastTermIndex != null) {
Preconditions.assertTrue(entry.getTerm() >= lastTermIndex.getTerm(),
"Entry term less than RaftLog's last term: %d, entry: %s", lastTermIndex.getTerm(), entry);
Preconditions.assertTrue(entry.getIndex() == lastTermIndex.getIndex() + 1,
"Difference between entry index and RaftLog's last index %d greater than 1, entry: %s", lastTermIndex.getIndex(), entry);
}
}
@Override
public final CompletableFuture<Long> truncate(long index) {
return runner.runSequentially(() -> truncateImpl(index));
}
protected abstract CompletableFuture<Long> truncateImpl(long index);
/**
* Purge asynchronously the log transactions.
* The implementation may choose to purge an index other than the suggested index.
*
* @param suggestedIndex the suggested index (inclusive) to be purged.
* @return the future of the actual purged log index.
*/
public final CompletableFuture<Long> purge(long suggestedIndex) {
final long lastPurge = purgeIndex.get();
if (suggestedIndex - lastPurge < purgeGap) {
return CompletableFuture.completedFuture(lastPurge);
}
LOG.info("{}: purge {}", getName(), suggestedIndex);
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
if (purged != null) {
purgeIndex.updateToMax(purged, infoIndexChange);
}
if (e != null) {
LOG.warn(getName() + ": Failed to purge " + suggestedIndex, e);
}
});
}
protected abstract CompletableFuture<Long> purgeImpl(long index);
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
return runner.runSequentially(() -> appendEntryImpl(entry));
}
protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry);
@Override
public final List<CompletableFuture<Long>> append(LogEntryProto... entries) {
return runner.runSequentially(() -> appendImpl(entries));
}
protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries);
/**
* @return the index of the last entry that has been flushed to the local storage.
*/
public abstract long getFlushIndex();
/**
* Write and flush the metadata (votedFor and term) into the meta file.
*
* We need to guarantee that the order of writeMetadata calls is the same with
* that when we change the in-memory term/votedFor. Otherwise we may persist
* stale term/votedFor in file.
*
* Since the leader change is not frequent, currently we simply put this call
* in the RaftPeer's lock. Later we can use an IO task queue to enforce the
* order.
*/
public abstract void writeMetadata(long term, RaftPeerId votedFor)
throws IOException;
public abstract Metadata loadMetadata() throws IOException;
public abstract void syncWithSnapshot(long lastSnapshotIndex);
public abstract boolean isConfigEntry(TermIndex ti);
@Override
public String toString() {
return getName() + ":" + state + ":c" + getLastCommittedIndex();
}
public static class Metadata {
private final RaftPeerId votedFor;
private final long term;
public Metadata(RaftPeerId votedFor, long term) {
this.votedFor = votedFor;
this.term = term;
}
public RaftPeerId getVotedFor() {
return votedFor;
}
public long getTerm() {
return term;
}
}
public AutoCloseableLock readLock() {
return AutoCloseableLock.acquire(lock.readLock());
}
public AutoCloseableLock writeLock() {
return AutoCloseableLock.acquire(lock.writeLock());
}
public boolean hasWriteLock() {
return this.lock.isWriteLockedByCurrentThread();
}
public boolean hasReadLock() {
return this.lock.getReadHoldCount() > 0 || hasWriteLock();
}
@Override
public void close() throws IOException {
state.close();
}
public String getName() {
return name;
}
/**
* Holds proto entry along with future which contains read state machine data
*/
public class EntryWithData {
private final LogEntryProto logEntry;
private final CompletableFuture<ByteString> future;
public EntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
this.logEntry = logEntry;
this.future = future;
}
public long getIndex() {
return logEntry.getIndex();
}
public int getSerializedSize() {
return ServerProtoUtils.getSerializedSize(logEntry);
}
public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
LogEntryProto entryProto;
if (future == null) {
return logEntry;
}
try {
entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry))
.get(timeout.getDuration(), timeout.getUnit());
} catch (TimeoutException t) {
throw t;
} catch (Throwable t) {
final String err = getName() + ": Failed readStateMachineData for " +
ServerProtoUtils.toLogEntryString(logEntry);
LOG.error(err, t);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(t));
}
// by this time we have already read the state machine data,
// so the log entry data should be set now
if (ServerProtoUtils.shouldReadStateMachineData(entryProto)) {
final String err = getName() + ": State machine data not set for " +
ServerProtoUtils.toLogEntryString(logEntry);
LOG.error(err);
throw new RaftLogIOException(err);
}
return entryProto;
}
@Override
public String toString() {
return ServerProtoUtils.toLogEntryString(logEntry);
}
}
}