blob: 74d6a8c03dc395f7bdeab9db552d90b4d969c687 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
/**
* The RaftLog implementation that writes log entries into segmented files in
* local disk.
*
* The max log segment size is 8MB. The real log segment size may not be
* exactly equal to this limit. If a log entry's size exceeds 8MB, this entry
* will be stored in a single segment.
*
* There are two types of segments: closed segment and open segment. The former
* is named as "log_startindex-endindex", the later is named as
* "log_inprogress_startindex".
*
* There can be multiple closed segments but there is at most one open segment.
* When the open segment reaches the size limit, or the log term increases, we
* close the open segment and start a new open segment. A closed segment cannot
* be appended anymore, but it can be truncated in case that a follower's log is
* inconsistent with the current leader.
*
* Every closed segment should be non-empty, i.e., it should contain at least
* one entry.
*
* There should not be any gap between segments. The first segment may not start
* from index 0 since there may be snapshots as log compaction. The last index
* in segments should be no smaller than the last index of snapshot, otherwise
* we may have hole when append further log.
*/
public class SegmentedRaftLog extends RaftLogBase {
/**
* I/O task definitions.
*/
abstract static class Task {
private final CompletableFuture<Long> future = new CompletableFuture<>();
private Timer.Context queueTimerContext;
CompletableFuture<Long> getFuture() {
return future;
}
void done() {
completeFuture();
}
final void completeFuture() {
final boolean completed = future.complete(getEndIndex());
Preconditions.assertTrue(completed,
() -> this + " is already " + StringUtils.completableFuture2String(future, false));
}
void failed(IOException e) {
this.getFuture().completeExceptionally(e);
}
abstract void execute() throws IOException;
abstract long getEndIndex();
void startTimerOnEnqueue(Timer queueTimer) {
queueTimerContext = queueTimer.time();
}
void stopTimerOnDequeue() {
if (queueTimerContext != null) {
queueTimerContext.stop();
}
}
int getSerializedSize() {
return 0;
}
@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + getEndIndex();
}
}
/** The server methods used in {@link SegmentedRaftLog}. */
interface ServerLogMethods {
ServerLogMethods DUMMY = new ServerLogMethods() {};
default long[] getFollowerNextIndices() {
return null;
}
default long getLastAppliedIndex() {
return INVALID_LOG_INDEX;
}
/** Notify the server that a log entry is being truncated. */
default void notifyTruncatedLogEntry(TermIndex ti) {
}
}
/**
* When the server is null, return the dummy instance of {@link ServerLogMethods}.
* Otherwise, the server is non-null, return the implementation using the given server.
*/
private ServerLogMethods newServerLogMethods(RaftServer.Division impl,
Consumer<LogEntryProto> notifyTruncatedLogEntry) {
if (impl == null) {
return ServerLogMethods.DUMMY;
}
return new ServerLogMethods() {
@Override
public long[] getFollowerNextIndices() {
return impl.getInfo().getFollowerNextIndices();
}
@Override
public long getLastAppliedIndex() {
return impl.getInfo().getLastAppliedIndex();
}
@Override
public void notifyTruncatedLogEntry(TermIndex ti) {
try {
final LogEntryProto entry = get(ti.getIndex());
notifyTruncatedLogEntry.accept(entry);
} catch (RaftLogIOException e) {
LOG.error("{}: Failed to read log {}", getName(), ti, e);
}
}
};
}
private final ServerLogMethods server;
private final RaftStorage storage;
private final StateMachine stateMachine;
private final SegmentedRaftLogCache cache;
private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
private final SegmentedRaftLogMetrics metrics;
@SuppressWarnings("parameternumber")
public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server,
StateMachine stateMachine, Consumer<LogEntryProto> notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent,
RaftStorage storage, LongSupplier snapshotIndexSupplier, RaftProperties properties) {
super(memberId, snapshotIndexSupplier, properties);
this.metrics = new SegmentedRaftLogMetrics(memberId);
this.server = newServerLogMethods(server, notifyTruncatedLogEntry);
this.storage = storage;
this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics());
this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics());
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
}
@Override
public SegmentedRaftLogMetrics getRaftLogMetrics() {
return metrics;
}
@Override
protected void openImpl(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer) throws IOException {
loadLogSegments(lastIndexInSnapshot, consumer);
final File openSegmentFile = Optional.ofNullable(cache.getOpenSegment()).map(LogSegment::getFile).orElse(null);
fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
Math.min(cache.getLastIndexInClosedSegments(), lastIndexInSnapshot),
openSegmentFile);
}
@Override
public long getStartIndex() {
return cache.getStartIndex();
}
private void loadLogSegments(long lastIndexInSnapshot,
Consumer<LogEntryProto> logConsumer) throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
final List<LogSegmentPath> paths = LogSegmentPath.getLogSegmentPaths(storage);
int i = 0;
for (LogSegmentPath pi : paths) {
// During the initial loading, we can only confirm the committed
// index based on the snapshot. This means if a log segment is not kept
// in cache after the initial loading, later we have to load its content
// again for updating the state machine.
// TODO we should let raft peer persist its committed index periodically
// so that during the initial loading we can apply part of the log
// entries to the state machine
boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
final Timer.Context loadSegmentContext = getRaftLogMetrics().getRaftLogLoadSegmentTimer().time();
cache.loadSegment(pi, keepEntryInCache, logConsumer);
loadSegmentContext.stop();
}
// if the largest index is smaller than the last index in snapshot, we do
// not load the log to avoid holes between log segments. This may happen
// when the local I/O worker is too slow to persist log (slower than
// committing the log and taking snapshot)
if (!cache.isEmpty() && cache.getEndIndex() < lastIndexInSnapshot) {
LOG.warn("End log index {} is smaller than last index in snapshot {}",
cache.getEndIndex(), lastIndexInSnapshot);
purgeImpl(lastIndexInSnapshot);
}
}
}
@Override
public LogEntryProto get(long index) throws RaftLogIOException {
checkLogState();
final LogSegment segment;
final LogRecord record;
try (AutoCloseableLock readLock = readLock()) {
segment = cache.getSegment(index);
if (segment == null) {
return null;
}
record = segment.getLogRecord(index);
if (record == null) {
return null;
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
}
}
// the entry is not in the segment's cache. Load the cache without holding the lock.
getRaftLogMetrics().onRaftLogCacheMiss();
checkAndEvictCache();
return segment.loadCache(record);
}
@Override
public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
final LogEntryProto entry = get(index);
if (entry == null) {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
return newEntryWithData(entry, null);
}
try {
CompletableFuture<ByteString> future = null;
if (stateMachine != null) {
future = stateMachine.data().read(entry).exceptionally(ex -> {
stateMachine.event().notifyLogFailed(ex, entry);
throw new CompletionException("Failed to read state machine data for log entry " + entry, ex);
});
}
return newEntryWithData(entry, future);
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " +
LogProtoUtils.toLogEntryString(entry);
LOG.error(err, e);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
}
}
private void checkAndEvictCache() {
if (cache.shouldEvict()) {
// TODO if the cache is hitting the maximum size and we cannot evict any
// segment's cache, should block the new entry appending or new segment
// allocation.
cache.evictCache(server.getFollowerNextIndices(), fileLogWorker.getSafeCacheEvictIndex(),
server.getLastAppliedIndex());
}
}
@Override
public TermIndex getTermIndex(long index) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
LogRecord record = cache.getLogRecord(index);
return record != null ? record.getTermIndex() : null;
}
}
@Override
public LogEntryHeader[] getEntries(long startIndex, long endIndex) {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getTermIndices(startIndex, endIndex);
}
}
@Override
public TermIndex getLastEntryTermIndex() {
checkLogState();
try(AutoCloseableLock readLock = readLock()) {
return cache.getLastTermIndex();
}
}
@Override
protected CompletableFuture<Long> truncateImpl(long index) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
SegmentedRaftLogCache.TruncationSegments ts = cache.truncate(index);
if (ts != null) {
Task task = fileLogWorker.truncate(ts, index);
return task.getFuture();
}
}
return CompletableFuture.completedFuture(index);
}
@Override
protected CompletableFuture<Long> purgeImpl(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
SegmentedRaftLogCache.TruncationSegments ts = cache.purge(index);
updateSnapshotIndexFromStateMachine();
LOG.debug("purging segments:{}", ts);
if (ts != null) {
Task task = fileLogWorker.purge(ts);
return task.getFuture();
}
}
return CompletableFuture.completedFuture(index);
}
@Override
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
}
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
final LogSegment currentOpenSegment = cache.getOpenSegment();
if (currentOpenSegment == null) {
cache.addOpenSegment(entry.getIndex());
fileLogWorker.startLogSegment(entry.getIndex());
} else if (isSegmentFull(currentOpenSegment, entry)) {
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
} else if (currentOpenSegment.numOfEntries() > 0 &&
currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
// the term changes
final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm();
Preconditions.assertTrue(currentTerm < entry.getTerm(),
"open segment's term %s is larger than the new entry's term %s",
currentTerm, entry.getTerm());
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
}
//TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
checkAndEvictCache();
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
CompletableFuture<Long> writeFuture =
fileLogWorker.writeLogEntry(entry).getFuture();
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
} else {
cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
return writeFuture;
} catch (Exception e) {
LOG.error("{}: Failed to append {}", getName(), LogProtoUtils.toLogEntryString(entry), e);
throw e;
} finally {
context.stop();
}
}
private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
if (segment.getTotalFileSize() >= segmentMaxSize) {
return true;
} else {
final long entrySize = LogSegment.getEntrySize(entry, LogSegment.Op.CHECK_SEGMENT_FILE_FULL);
// if entry size is greater than the max segment size, write it directly
// into the current segment
return entrySize <= segmentMaxSize &&
segment.getTotalFileSize() + entrySize > segmentMaxSize;
}
}
@Override
public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> entries) {
checkLogState();
if (entries == null || entries.isEmpty()) {
return Collections.emptyList();
}
try(AutoCloseableLock writeLock = writeLock()) {
final TruncateIndices ti = cache.computeTruncateIndices(server::notifyTruncatedLogEntry, entries);
final long truncateIndex = ti.getTruncateIndex();
final int index = ti.getArrayIndex();
LOG.debug("truncateIndex={}, arrayIndex={}", truncateIndex, index);
final List<CompletableFuture<Long>> futures;
if (truncateIndex != -1) {
futures = new ArrayList<>(entries.size() - index + 1);
futures.add(truncate(truncateIndex));
} else {
futures = new ArrayList<>(entries.size() - index);
}
for (int i = index; i < entries.size(); i++) {
futures.add(appendEntry(entries.get(i)));
}
return futures;
}
}
@Override
public long getFlushIndex() {
return fileLogWorker.getFlushIndex();
}
@Override
public void persistMetadata(RaftStorageMetadata metadata) throws IOException {
storage.getMetadataFile().persist(metadata);
}
@Override
public RaftStorageMetadata loadMetadata() throws IOException {
return storage.getMetadataFile().getMetadata();
}
@Override
public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
fileLogWorker.syncWithSnapshot(lastSnapshotIndex);
// TODO purge normal/tmp/corrupt snapshot files
// if the last index in snapshot is larger than the index of the last
// log entry, we should delete all the log entries and their cache to avoid
// gaps between log segments.
// Close open log segment if entries are already included in snapshot
LogSegment openSegment = cache.getOpenSegment();
if (openSegment != null && openSegment.hasEntries()) {
if (LOG.isDebugEnabled()) {
LOG.debug("syncWithSnapshot : Found open segment {}, with end index {},"
+ " snapshotIndex {}", openSegment, openSegment.getEndIndex(),
lastSnapshotIndex);
}
if (openSegment.getEndIndex() <= lastSnapshotIndex) {
fileLogWorker.closeLogSegment(openSegment);
cache.rollOpenSegment(false);
}
}
return purgeImpl(lastSnapshotIndex);
}
@Override
public void close() throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
super.close();
cache.close();
}
fileLogWorker.close();
storage.close();
getRaftLogMetrics().unregister();
}
SegmentedRaftLogCache getRaftLogCache() {
return cache;
}
@Override
public String toLogEntryString(LogEntryProto logEntry) {
return LogProtoUtils.toLogEntryString(logEntry, stateMachine::toStateMachineLogEntryString);
}
}