blob: 11eddb927f818e6dfacf105eb4666e19ad9cbf18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
/**
* Base class of RaftLog. Currently we provide two types of RaftLog
* implementation:
* 1. MemoryRaftLog: all the log entries are stored in memory. This is only used
* for testing.
* 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
* in segments.
*/
public abstract class RaftLogBase implements RaftLog {
private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", getName(), s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", getName(), s);
/** The least valid log index, i.e. the index used when writing to an empty log. */
public static final long LEAST_VALID_LOG_INDEX = 0L;
public static final long INVALID_LOG_INDEX = LEAST_VALID_LOG_INDEX - 1;
private final String name;
/**
* The largest committed index. Note the last committed log may be included
* in the latest snapshot file.
*/
private final RaftLogIndex commitIndex;
/** The last log index in snapshot */
private final RaftLogIndex snapshotIndex;
private final RaftLogIndex purgeIndex;
private final int purgeGap;
private final RaftGroupMemberId memberId;
private final int maxBufferSize;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Runner runner = new Runner(this::getName);
private final OpenCloseState state;
private final LongSupplier getSnapshotIndexFromStateMachine;
private final TimeDuration stateMachineDataReadTimeout;
private final long purgePreservation;
private volatile LogEntryProto lastMetadataEntry = null;
protected RaftLogBase(RaftGroupMemberId memberId,
LongSupplier getSnapshotIndexFromStateMachine,
RaftProperties properties) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
this.memberId = memberId;
long index = getSnapshotIndexFromStateMachine.getAsLong();
this.commitIndex = new RaftLogIndex("commitIndex", index);
this.snapshotIndex = new RaftLogIndex("snapshotIndex", index);
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
this.state = new OpenCloseState(getName());
this.getSnapshotIndexFromStateMachine = getSnapshotIndexFromStateMachine;
this.stateMachineDataReadTimeout = RaftServerConfigKeys.Log.StateMachineData.readTimeout(properties);
this.purgePreservation = RaftServerConfigKeys.Log.purgePreservationLogNum(properties);
}
@Override
public long getLastCommittedIndex() {
return commitIndex.get();
}
@Override
public long getSnapshotIndex() {
return snapshotIndex.get();
}
public void checkLogState() {
state.assertOpen();
}
/** Is this log already opened? */
public boolean isOpened() {
return state.isOpened();
}
@Override
public boolean updateCommitIndex(long majorityIndex, long currentTerm, boolean isLeader) {
try(AutoCloseableLock writeLock = writeLock()) {
final long oldCommittedIndex = getLastCommittedIndex();
final long newCommitIndex = Math.min(majorityIndex, getFlushIndex());
if (oldCommittedIndex < newCommitIndex) {
if (!isLeader) {
commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
return true;
}
// Only update last committed index for current term. See ยง5.4.2 in paper for details.
final TermIndex entry = getTermIndex(newCommitIndex);
if (entry != null && entry.getTerm() == currentTerm) {
commitIndex.updateIncreasingly(newCommitIndex, traceIndexChange);
return true;
}
}
}
return false;
}
protected void updateSnapshotIndexFromStateMachine() {
updateSnapshotIndex(getSnapshotIndexFromStateMachine.getAsLong());
}
@Override
public void updateSnapshotIndex(long newSnapshotIndex) {
try(AutoCloseableLock writeLock = writeLock()) {
final long oldSnapshotIndex = getSnapshotIndex();
if (oldSnapshotIndex < newSnapshotIndex) {
snapshotIndex.updateIncreasingly(newSnapshotIndex, infoIndexChange);
}
final long oldCommitIndex = getLastCommittedIndex();
if (oldCommitIndex < newSnapshotIndex) {
commitIndex.updateIncreasingly(newSnapshotIndex, traceIndexChange);
}
}
}
@Override
public final long append(long term, TransactionContext transaction) throws StateMachineException {
return runner.runSequentially(() -> appendImpl(term, transaction));
}
private long appendImpl(long term, TransactionContext operation) throws StateMachineException {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
final long nextIndex = getNextIndex();
// This is called here to guarantee strict serialization of callback executions in case
// the SM wants to attach a logic depending on ordered execution in the log commit order.
try {
operation = operation.preAppendTransaction();
} catch (StateMachineException e) {
throw e;
} catch (IOException e) {
throw new StateMachineException(memberId, e);
}
// build the log entry after calling the StateMachine
final LogEntryProto e = operation.initLogEntry(term, nextIndex);
int entrySize = e.getSerializedSize();
if (entrySize > maxBufferSize) {
throw new StateMachineException(memberId, new RaftLogIOException(
"Log entry size " + entrySize + " exceeds the max buffer limit of " + maxBufferSize));
}
appendEntry(e);
return nextIndex;
}
}
@Override
public final long appendMetadata(long term, long newCommitIndex) {
return runner.runSequentially(() -> appendMetadataImpl(term, newCommitIndex));
}
private long appendMetadataImpl(long term, long newCommitIndex) {
checkLogState();
if (!shouldAppendMetadata(newCommitIndex)) {
return INVALID_LOG_INDEX;
}
final LogEntryProto entry;
final long nextIndex;
try(AutoCloseableLock writeLock = writeLock()) {
nextIndex = getNextIndex();
entry = LogProtoUtils.toLogEntryProto(newCommitIndex, term, nextIndex);
appendEntry(entry);
}
lastMetadataEntry = entry;
return nextIndex;
}
private boolean shouldAppendMetadata(long newCommitIndex) {
if (newCommitIndex <= 0) {
// do not log the first conf entry
return false;
} else if (Optional.ofNullable(lastMetadataEntry)
.filter(e -> e.getIndex() == newCommitIndex || e.getMetadataEntry().getCommitIndex() >= newCommitIndex)
.isPresent()) {
//log neither lastMetadataEntry, nor entries with a smaller commit index.
return false;
}
try {
if (get(newCommitIndex).hasMetadataEntry()) {
// do not log the metadata entry
return false;
}
} catch(RaftLogIOException e) {
LOG.error("Failed to get log entry for index " + newCommitIndex, e);
}
return true;
}
@Override
public final long append(long term, RaftConfiguration configuration) {
return runner.runSequentially(() -> appendImpl(term, configuration));
}
private long appendImpl(long term, RaftConfiguration newConf) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
final long nextIndex = getNextIndex();
appendEntry(LogProtoUtils.toLogEntryProto(newConf, term, nextIndex));
return nextIndex;
}
}
@Override
public final void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
openImpl(lastIndexInSnapshot, e -> {
if (e.hasMetadataEntry()) {
lastMetadataEntry = e;
} else if (consumer != null) {
consumer.accept(e);
}
});
Optional.ofNullable(lastMetadataEntry).ifPresent(
e -> commitIndex.updateToMax(e.getMetadataEntry().getCommitIndex(), infoIndexChange));
state.open();
final long startIndex = getStartIndex();
if (startIndex > LEAST_VALID_LOG_INDEX) {
purgeIndex.updateIncreasingly(startIndex - 1, infoIndexChange);
}
}
protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
}
/**
* Validate the term and index of entry w.r.t RaftLog
*/
protected void validateLogEntry(LogEntryProto entry) {
if (entry.hasMetadataEntry()) {
return;
}
long latestSnapshotIndex = getSnapshotIndex();
TermIndex lastTermIndex = getLastEntryTermIndex();
if (lastTermIndex != null) {
long lastIndex = lastTermIndex.getIndex() > latestSnapshotIndex ?
lastTermIndex.getIndex() : latestSnapshotIndex;
Preconditions.assertTrue(entry.getTerm() >= lastTermIndex.getTerm(),
"Entry term less than RaftLog's last term: %d, entry: %s", lastTermIndex.getTerm(), entry);
Preconditions.assertTrue(entry.getIndex() == lastIndex + 1,
"Difference between entry index and RaftLog's last index %d (or snapshot index %d) " +
"is greater than 1, entry: %s",
lastTermIndex.getIndex(), latestSnapshotIndex, entry);
} else {
Preconditions.assertTrue(entry.getIndex() == latestSnapshotIndex + 1,
"Difference between entry index and RaftLog's latest snapshot index %d is greater than 1 " +
"and in between log entries are not present, entry: %s",
latestSnapshotIndex, entry);
}
}
@Override
public final CompletableFuture<Long> truncate(long index) {
return runner.runSequentially(() -> truncateImpl(index));
}
protected abstract CompletableFuture<Long> truncateImpl(long index);
@Override
public final CompletableFuture<Long> purge(long suggestedIndex) {
if (purgePreservation > 0) {
final long currentIndex = getNextIndex() - 1;
suggestedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
}
final long lastPurge = purgeIndex.get();
if (suggestedIndex - lastPurge < purgeGap) {
return CompletableFuture.completedFuture(lastPurge);
}
LOG.info("{}: purge {}", getName(), suggestedIndex);
final long finalSuggestedIndex = suggestedIndex;
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
if (purged != null) {
purgeIndex.updateToMax(purged, infoIndexChange);
}
if (e != null) {
LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
}
});
}
protected abstract CompletableFuture<Long> purgeImpl(long index);
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
return runner.runSequentially(() -> appendEntryImpl(entry));
}
protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry);
@Override
public final List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
return runner.runSequentially(() -> appendImpl(entries));
}
protected abstract List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> entries);
@Override
public String toString() {
return getName() + ":" + state + ":c" + getLastCommittedIndex();
}
public AutoCloseableLock readLock() {
return AutoCloseableLock.acquire(lock.readLock());
}
public AutoCloseableLock writeLock() {
return AutoCloseableLock.acquire(lock.writeLock());
}
public boolean hasWriteLock() {
return this.lock.isWriteLockedByCurrentThread();
}
public boolean hasReadLock() {
return this.lock.getReadHoldCount() > 0 || hasWriteLock();
}
@Override
public void close() throws IOException {
state.close();
}
public String getName() {
return name;
}
protected EntryWithData newEntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
return new EntryWithDataImpl(logEntry, future);
}
/**
* Holds proto entry along with future which contains read state machine data
*/
class EntryWithDataImpl implements EntryWithData {
private final LogEntryProto logEntry;
private final CompletableFuture<ByteString> future;
EntryWithDataImpl(LogEntryProto logEntry, CompletableFuture<ByteString> future) {
this.logEntry = logEntry;
this.future = future == null? null: future.thenApply(this::checkStateMachineData);
}
private ByteString checkStateMachineData(ByteString data) {
if (data == null) {
throw new IllegalStateException("State machine data is null for log entry " + logEntry);
}
return data;
}
@Override
public int getSerializedSize() {
return LogProtoUtils.getSerializedSize(logEntry);
}
@Override
public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
LogEntryProto entryProto;
if (future == null) {
return logEntry;
}
try {
entryProto = future.thenApply(data -> LogProtoUtils.addStateMachineData(data, logEntry))
.get(timeout.getDuration(), timeout.getUnit());
} catch (TimeoutException t) {
if (timeout.compareTo(stateMachineDataReadTimeout) > 0) {
getRaftLogMetrics().onStateMachineDataReadTimeout();
}
throw t;
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(logEntry);
LOG.error(err, e);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
}
// by this time we have already read the state machine data,
// so the log entry data should be set now
if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
final String err = getName() + ": State machine data not set for " + toLogEntryString(logEntry);
LOG.error(err);
throw new RaftLogIOException(err);
}
return entryProto;
}
@Override
public String toString() {
return toLogEntryString(logEntry);
}
}
public String toLogEntryString(LogEntryProto logEntry) {
return LogProtoUtils.toLogEntryString(logEntry);
}
}