blob: ec7ab73fba060e02082f5f495e1630a2855b076d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog.segmented;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog.Task;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* This class takes the responsibility of all the raft log related I/O ops for a
* raft peer.
*/
class SegmentedRaftLogWorker implements Runnable {
static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
private final RatisMetricRegistry metricRegistry;
static class StateMachineDataPolicy {
private final boolean sync;
private final TimeDuration syncTimeout;
private final int syncTimeoutRetry;
StateMachineDataPolicy(RaftProperties properties) {
this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties);
this.syncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
this.syncTimeoutRetry = RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties);
Preconditions.assertTrue(syncTimeoutRetry >= -1);
}
boolean isSync() {
return sync;
}
void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) throws IOException {
Preconditions.assertTrue(isSync());
TimeoutIOException lastException = null;
for(int retry = 0; syncTimeoutRetry == -1 || retry <= syncTimeoutRetry; retry++) {
try {
IOUtils.getFromFuture(future, getName, syncTimeout);
return;
} catch(TimeoutIOException e) {
LOG.warn("Timeout " + retry + (syncTimeoutRetry == -1? "/~": "/" + syncTimeoutRetry), e);
lastException = e;
}
}
Objects.requireNonNull(lastException, "lastException == null");
throw lastException;
}
}
static class WriteLogTasks {
private final Queue<WriteLog> q = new LinkedList<>();
private volatile long index;
void offerOrCompleteFuture(WriteLog writeLog) {
if (writeLog.getEndIndex() <= index || !offer(writeLog)) {
writeLog.completeFuture();
}
}
private synchronized boolean offer(WriteLog writeLog) {
if (writeLog.getEndIndex() <= index) { // compare again synchronized
return false;
}
q.offer(writeLog);
return true;
}
synchronized void updateIndex(long i) {
index = i;
for(;;) {
final Task peeked = q.peek();
if (peeked == null || peeked.getEndIndex() > index) {
return;
}
final Task polled = q.poll();
Preconditions.assertTrue(polled == peeked);
polled.completeFuture();
}
}
}
private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", this, s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", this, s);
private final String name;
/**
* The task queue accessed by rpc handler threads and the io worker thread.
*/
private final DataBlockingQueue<Task> queue;
private final WriteLogTasks writeTasks = new WriteLogTasks();
private volatile boolean running = true;
private final Thread workerThread;
private final RaftStorage storage;
private volatile SegmentedRaftLogOutputStream out;
private final Runnable submitUpdateCommitEvent;
private final StateMachine stateMachine;
private final Timer logFlushTimer;
/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
* has not been flushed.
*/
private int pendingFlushNum = 0;
/** the index of the last entry that has been written */
private long lastWrittenIndex;
/** the largest index of the entry that has been flushed */
private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0);
private final int forceSyncNum;
private final long segmentMaxSize;
private final long preallocatedSize;
private final int bufferSize;
private final RaftServerImpl server;
private final StateMachineDataPolicy stateMachineDataPolicy;
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftServerImpl server, RaftStorage storage, RaftProperties properties) {
this.name = memberId + "-" + getClass().getSimpleName();
LOG.info("new {} for {}", name, storage);
this.submitUpdateCommitEvent = submitUpdateCommitEvent;
this.stateMachine = stateMachine;
this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
this.storage = storage;
this.server = server;
final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
final int queueElementLimit = RaftServerConfigKeys.Log.queueElementLimit(properties);
this.queue =
new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit, Task::getSerializedSize);
metricRegistry.gauge("dataQueueSize", new MetricRegistry.MetricSupplier() {
@Override public Metric newMetric() {
return new Gauge<Integer>() {
@Override public Integer getValue() {
//q.size() is O(1) operation
return queue.size();
}
};
}
});
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
this.stateMachineDataPolicy = new StateMachineDataPolicy(properties);
this.workerThread = new Thread(this, name);
// Server Id can be null in unit tests
this.logFlushTimer = metricRegistry.timer("flush-time");
}
void start(long latestIndex, File openSegmentFile) throws IOException {
LOG.trace("{} start(latestIndex={}, openSegmentFile={})", name, latestIndex, openSegmentFile);
lastWrittenIndex = latestIndex;
flushIndex.setUnconditionally(latestIndex, infoIndexChange);
if (openSegmentFile != null) {
Preconditions.assertTrue(openSegmentFile.exists());
out = new SegmentedRaftLogOutputStream(openSegmentFile, true, segmentMaxSize,
preallocatedSize, bufferSize);
}
workerThread.start();
}
void close() {
this.running = false;
workerThread.interrupt();
try {
workerThread.join(3000);
} catch (InterruptedException ignored) {
}
IOUtils.cleanup(LOG, out);
LOG.info("{} close()", name);
}
/**
* A snapshot has just been installed on the follower. Need to update the IO
* worker's state accordingly.
*/
void syncWithSnapshot(long lastSnapshotIndex) {
queue.clear();
lastWrittenIndex = lastSnapshotIndex;
flushIndex.setUnconditionally(lastSnapshotIndex, infoIndexChange);
pendingFlushNum = 0;
}
@Override
public String toString() {
return name;
}
/**
* This is protected by the RaftServer and RaftLog's lock.
*/
private Task addIOTask(Task task) {
LOG.debug("{} adds IO task {}", name, task);
try {
for(; !queue.offer(task, ONE_SECOND); ) {
Preconditions.assertTrue(isAlive(),
"the worker thread is not alive");
}
} catch (Throwable t) {
if (t instanceof InterruptedException && !running) {
LOG.info("Got InterruptedException when adding task " + task
+ ". The SegmentedRaftLogWorker already stopped.");
} else {
LOG.error("Failed to add IO task {}", task, t);
if (server != null) {
server.shutdown(false);
}
}
}
return task;
}
boolean isAlive() {
return running && workerThread.isAlive();
}
@Override
public void run() {
// if and when a log task encounters an exception
RaftLogIOException logIOException = null;
while (running) {
try {
Task task = queue.poll(ONE_SECOND);
if (task != null) {
try {
if (logIOException != null) {
throw logIOException;
} else {
task.execute();
}
} catch (IOException e) {
if (task.getEndIndex() < lastWrittenIndex) {
LOG.info("Ignore IOException when handling task " + task
+ " which is smaller than the lastWrittenIndex."
+ " There should be a snapshot installed.", e);
} else {
task.failed(e);
if (logIOException == null) {
logIOException = new RaftLogIOException("Log already failed"
+ " at index " + task.getEndIndex()
+ " for task " + task, e);
}
continue;
}
}
task.done();
}
} catch (InterruptedException e) {
if (running) {
LOG.warn("{} got interrupted while still running",
Thread.currentThread().getName());
}
LOG.info(Thread.currentThread().getName()
+ " was interrupted, exiting. There are " + queue.getNumElements()
+ " tasks remaining in the queue.");
Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
if (!running) {
LOG.info("{} got closed and hit exception",
Thread.currentThread().getName(), t);
} else {
LOG.error("{} hit exception", Thread.currentThread().getName(), t);
// Shutdown raft group instead of terminating jvm.
if (server != null) {
server.shutdown(false);
}
}
}
}
}
private boolean shouldFlush() {
return pendingFlushNum >= forceSyncNum ||
(pendingFlushNum > 0 && queue.isEmpty());
}
private void flushWrites() throws IOException {
if (out != null) {
LOG.debug("{}: flush {}", name, out);
final Timer.Context timerContext = logFlushTimer.time();
try {
final CompletableFuture<Void> f = stateMachine != null ?
stateMachine.flushStateMachineData(lastWrittenIndex) :
CompletableFuture.completedFuture(null);
if (stateMachineDataPolicy.isSync()) {
stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
}
out.flush();
if (!stateMachineDataPolicy.isSync()) {
IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
}
} finally {
timerContext.stop();
}
updateFlushedIndexIncreasingly();
}
}
private void updateFlushedIndexIncreasingly() {
final long i = lastWrittenIndex;
flushIndex.updateIncreasingly(i, traceIndexChange);
postUpdateFlushedIndex();
writeTasks.updateIndex(i);
}
private void postUpdateFlushedIndex() {
pendingFlushNum = 0;
Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
}
/**
* The following several methods (startLogSegment, rollLogSegment,
* writeLogEntry, and truncate) are only called by SegmentedRaftLog which is
* protected by RaftServer's lock.
*
* Thus all the tasks are created and added sequentially.
*/
void startLogSegment(long startIndex) {
LOG.info("{}: Starting segment from index:{}", name, startIndex);
addIOTask(new StartLogSegment(startIndex));
}
void rollLogSegment(LogSegment segmentToClose) {
LOG.info("{}: Rolling segment {} to index:{}", name,
segmentToClose.toString(), segmentToClose.getEndIndex());
addIOTask(new FinalizeLogSegment(segmentToClose));
addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
}
Task writeLogEntry(LogEntryProto entry) {
return addIOTask(new WriteLog(entry));
}
Task truncate(TruncationSegments ts, long index) {
LOG.info("{}: Truncating segments {}, start index {}", name, ts, index);
return addIOTask(new TruncateLog(ts, index));
}
Task purge(TruncationSegments ts) {
return addIOTask(new PurgeLog(ts, storage));
}
private static final class PurgeLog extends Task {
private final TruncationSegments segments;
private final RaftStorage storage;
private PurgeLog(TruncationSegments segments, RaftStorage storage) {
this.segments = segments;
this.storage = storage;
}
@Override
void execute() throws IOException {
if (segments.toDelete != null) {
for (SegmentFileInfo fileInfo : segments.toDelete) {
File delFile = storage.getStorageDir()
.getClosedLogFile(fileInfo.startIndex, fileInfo.endIndex);
FileUtils.deleteFile(delFile);
}
}
}
@Override
long getEndIndex() {
return segments.maxEndIndex();
}
}
private class WriteLog extends Task {
private final LogEntryProto entry;
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
WriteLog(LogEntryProto entry) {
this.entry = ServerProtoUtils.removeStateMachineData(entry);
if (this.entry == entry || stateMachine == null) {
this.stateMachineFuture = null;
} else {
try {
// this.entry != entry iff the entry has state machine data
this.stateMachineFuture = stateMachine.writeStateMachineData(entry);
} catch (Throwable e) {
LOG.error(name + ": writeStateMachineData failed for index " + entry.getIndex()
+ ", entry=" + ServerProtoUtils.toLogEntryString(entry), e);
throw e;
}
}
this.combined = stateMachineFuture == null? super.getFuture()
: super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index);
}
@Override
void failed(IOException e) {
stateMachine.notifyLogFailed(e, entry);
super.failed(e);
}
@Override
int getSerializedSize() {
return ServerProtoUtils.getSerializedSize(entry);
}
@Override
CompletableFuture<Long> getFuture() {
return combined;
}
@Override
void done() {
writeTasks.offerOrCompleteFuture(this);
}
@Override
public void execute() throws IOException {
if (stateMachineDataPolicy.isSync() && stateMachineFuture != null) {
stateMachineDataPolicy.getFromFuture(stateMachineFuture, () -> this + "-writeStateMachineData");
}
Preconditions.assertTrue(out != null);
Preconditions.assertTrue(lastWrittenIndex + 1 == entry.getIndex(),
"lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry);
out.write(entry);
lastWrittenIndex = entry.getIndex();
pendingFlushNum++;
if (shouldFlush()) {
flushWrites();
}
}
@Override
long getEndIndex() {
return entry.getIndex();
}
@Override
public String toString() {
return super.toString() + ": " + ServerProtoUtils.toLogEntryString(entry);
}
}
private class FinalizeLogSegment extends Task {
private final long startIndex;
private final long endIndex;
FinalizeLogSegment(LogSegment segmentToClose) {
Preconditions.assertTrue(segmentToClose != null, "Log segment to be rolled is null");
this.startIndex = segmentToClose.getStartIndex();
this.endIndex = segmentToClose.getEndIndex();
}
@Override
public void execute() throws IOException {
IOUtils.cleanup(LOG, out);
out = null;
File openFile = storage.getStorageDir().getOpenLogFile(startIndex);
Preconditions.assertTrue(openFile.exists(),
() -> name + ": File " + openFile + " to be rolled does not exist");
if (endIndex - startIndex + 1 > 0) {
// finalize the current open segment
File dstFile = storage.getStorageDir().getClosedLogFile(startIndex, endIndex);
Preconditions.assertTrue(!dstFile.exists());
FileUtils.move(openFile, dstFile);
LOG.info("{}: Rolled log segment from {} to {}", name, openFile, dstFile);
} else { // delete the file of the empty segment
FileUtils.deleteFile(openFile);
LOG.info("{}: Deleted empty log segment {}", name, openFile);
}
updateFlushedIndexIncreasingly();
}
@Override
void failed(IOException e) {
// not failed for a specific log entry, but an entire segment
stateMachine.notifyLogFailed(e, null);
super.failed(e);
}
@Override
long getEndIndex() {
return endIndex;
}
@Override
public String toString() {
return super.toString() + ": " + "startIndex=" + startIndex + " endIndex=" + endIndex;
}
}
private class StartLogSegment extends Task {
private final long newStartIndex;
StartLogSegment(long newStartIndex) {
this.newStartIndex = newStartIndex;
}
@Override
void execute() throws IOException {
File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s",
openFile, name);
Preconditions.assertTrue(out == null && pendingFlushNum == 0);
out = new SegmentedRaftLogOutputStream(openFile, false, segmentMaxSize,
preallocatedSize, bufferSize);
Preconditions.assertTrue(openFile.exists(), "Failed to create file %s for %s",
openFile.getAbsolutePath(), name);
LOG.info("{}: created new log segment {}", name, openFile);
}
@Override
long getEndIndex() {
return newStartIndex;
}
}
private class TruncateLog extends Task {
private final TruncationSegments segments;
private final long truncateIndex;
TruncateLog(TruncationSegments ts, long index) {
this.segments = ts;
this.truncateIndex = index;
}
@Override
void execute() throws IOException {
IOUtils.cleanup(null, out);
out = null;
CompletableFuture<Void> stateMachineFuture = null;
if (stateMachine != null) {
stateMachineFuture = stateMachine.truncateStateMachineData(truncateIndex);
}
if (segments.toTruncate != null) {
File fileToTruncate = segments.toTruncate.isOpen ?
storage.getStorageDir().getOpenLogFile(
segments.toTruncate.startIndex) :
storage.getStorageDir().getClosedLogFile(
segments.toTruncate.startIndex,
segments.toTruncate.endIndex);
Preconditions.assertTrue(fileToTruncate.exists(),
"File %s to be truncated does not exist", fileToTruncate);
FileUtils.truncateFile(fileToTruncate, segments.toTruncate.targetLength);
// rename the file
File dstFile = storage.getStorageDir().getClosedLogFile(
segments.toTruncate.startIndex, segments.toTruncate.newEndIndex);
Preconditions.assertTrue(!dstFile.exists(),
"Truncated file %s already exists ", dstFile);
FileUtils.move(fileToTruncate, dstFile);
LOG.info("{}: Truncated log file {} to length {} and moved it to {}", name,
fileToTruncate, segments.toTruncate.targetLength, dstFile);
// update lastWrittenIndex
lastWrittenIndex = segments.toTruncate.newEndIndex;
}
if (segments.toDelete != null && segments.toDelete.length > 0) {
long minStart = segments.toDelete[0].startIndex;
for (SegmentFileInfo del : segments.toDelete) {
final File delFile;
if (del.isOpen) {
delFile = storage.getStorageDir().getOpenLogFile(del.startIndex);
} else {
delFile = storage.getStorageDir()
.getClosedLogFile(del.startIndex, del.endIndex);
}
Preconditions.assertTrue(delFile.exists(),
"File %s to be deleted does not exist", delFile);
FileUtils.deleteFile(delFile);
LOG.info("{}: Deleted log file {}", name, delFile);
minStart = Math.min(minStart, del.startIndex);
}
if (segments.toTruncate == null) {
lastWrittenIndex = minStart - 1;
}
}
if (stateMachineFuture != null) {
IOUtils.getFromFuture(stateMachineFuture, () -> this + "-truncateStateMachineData");
}
flushIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
postUpdateFlushedIndex();
}
@Override
long getEndIndex() {
if (segments.toTruncate != null) {
return segments.toTruncate.newEndIndex;
} else if (segments.toDelete.length > 0) {
return segments.toDelete[segments.toDelete.length - 1].endIndex;
}
return RaftLog.INVALID_LOG_INDEX;
}
@Override
public String toString() {
return super.toString() + ": " + segments;
}
}
long getFlushIndex() {
return flushIndex.get();
}
}