blob: 0d1ea763b6b0a5fa03d97d41f7eb8f7776f811fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog.Task;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* This class takes the responsibility of all the raft log related I/O ops for a
* raft peer.
*/
class SegmentedRaftLogWorker {
static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
static class StateMachineDataPolicy {
private final boolean sync;
private final TimeDuration syncTimeout;
private final int syncTimeoutRetry;
private final SegmentedRaftLogMetrics metrics;
StateMachineDataPolicy(RaftProperties properties, SegmentedRaftLogMetrics metricRegistry) {
this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties);
this.syncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
this.syncTimeoutRetry = RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties);
this.metrics = metricRegistry;
Preconditions.assertTrue(syncTimeoutRetry >= -1);
}
boolean isSync() {
return sync;
}
void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) throws IOException {
Preconditions.assertTrue(isSync());
TimeoutIOException lastException = null;
for(int retry = 0; syncTimeoutRetry == -1 || retry <= syncTimeoutRetry; retry++) {
try {
IOUtils.getFromFuture(future, getName, syncTimeout);
return;
} catch(TimeoutIOException e) {
LOG.warn("Timeout " + retry + (syncTimeoutRetry == -1? "/~": "/" + syncTimeoutRetry), e);
lastException = e;
metrics.onStateMachineDataWriteTimeout();
}
}
Objects.requireNonNull(lastException, "lastException == null");
throw lastException;
}
}
static class WriteLogTasks {
private final Queue<WriteLog> q = new LinkedList<>();
private volatile long index;
void offerOrCompleteFuture(WriteLog writeLog) {
if (writeLog.getEndIndex() <= index || !offer(writeLog)) {
writeLog.completeFuture();
}
}
private synchronized boolean offer(WriteLog writeLog) {
if (writeLog.getEndIndex() <= index) { // compare again synchronized
return false;
}
q.offer(writeLog);
return true;
}
synchronized void updateIndex(long i) {
index = i;
for(;;) {
final Task peeked = q.peek();
if (peeked == null || peeked.getEndIndex() > index) {
return;
}
final Task polled = q.poll();
Preconditions.assertTrue(polled == peeked);
polled.completeFuture();
}
}
}
private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", this, s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", this, s);
private final String name;
/**
* The task queue accessed by rpc handler threads and the io worker thread.
*/
private final DataBlockingQueue<Task> queue;
private final WriteLogTasks writeTasks = new WriteLogTasks();
private volatile boolean running = true;
private final ExecutorService workerThreadExecutor;
private final RaftStorage storage;
private volatile SegmentedRaftLogOutputStream out;
private final Runnable submitUpdateCommitEvent;
private final StateMachine stateMachine;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final ByteBuffer writeBuffer;
/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
* has not been flushed.
*/
private int pendingFlushNum = 0;
/** the index of the last entry that has been written */
private long lastWrittenIndex;
private volatile int flushBatchSize = 0;
/** the largest index of the entry that has been flushed */
private final RaftLogIndex flushIndex = new RaftLogIndex("flushIndex", 0);
/** the index up to which cache can be evicted - max of snapshotIndex and
* largest index in a closed segment */
private final RaftLogIndex safeCacheEvictIndex = new RaftLogIndex("safeCacheEvictIndex", 0);
private final int forceSyncNum;
private final long segmentMaxSize;
private final long preallocatedSize;
private final RaftServer.Division server;
private final boolean asyncFlush;
private final boolean unsafeFlush;
private final ExecutorService flushExecutor;
private final StateMachineDataPolicy stateMachineDataPolicy;
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftServer.Division server, RaftStorage storage, RaftProperties properties,
SegmentedRaftLogMetrics metricRegistry) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
LOG.info("new {} for {}", name, storage);
this.submitUpdateCommitEvent = submitUpdateCommitEvent;
this.stateMachine = stateMachine;
this.raftLogMetrics = metricRegistry;
this.storage = storage;
this.server = server;
final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
final int queueElementLimit = RaftServerConfigKeys.Log.queueElementLimit(properties);
this.queue =
new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit, Task::getSerializedSize);
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
this.stateMachineDataPolicy = new StateMachineDataPolicy(properties, metricRegistry);
this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor(name);
// Server Id can be null in unit tests
metricRegistry.addDataQueueSizeGauge(queue::getNumElements);
metricRegistry.addLogWorkerQueueSizeGauge(writeTasks.q::size);
metricRegistry.addFlushBatchSizeGauge(() -> flushBatchSize);
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
// 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
if (bufferSize < logEntryLimit + 8) {
throw new IllegalArgumentException(RaftServerConfigKeys.Log.WRITE_BUFFER_SIZE_KEY
+ " (= " + bufferSize
+ ") is less than " + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY
+ " + 8 (= " + (logEntryLimit + 8) + ")");
}
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
if (asyncFlush && unsafeFlush) {
throw new IllegalStateException("Cannot enable both " + RaftServerConfigKeys.Log.UNSAFE_FLUSH_ENABLED_KEY +
" and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY);
}
this.flushExecutor = (!asyncFlush && !unsafeFlush)? null
: ConcurrentUtils.newSingleThreadExecutor(name + "-flush");
}
void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
LOG.trace("{} start(latestIndex={}, openSegmentFile={})", name, latestIndex, openSegmentFile);
lastWrittenIndex = latestIndex;
flushIndex.setUnconditionally(latestIndex, infoIndexChange);
safeCacheEvictIndex.setUnconditionally(evictIndex, infoIndexChange);
if (openSegmentFile != null) {
Preconditions.assertTrue(openSegmentFile.exists());
allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
}
workerThreadExecutor.submit(this::run);
}
void close() {
this.running = false;
Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3),
workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, name));
IOUtils.cleanup(LOG, out);
LOG.info("{} close()", name);
}
/**
* A snapshot has just been installed on the follower. Need to update the IO
* worker's state accordingly.
*/
void syncWithSnapshot(long lastSnapshotIndex) {
queue.clear();
lastWrittenIndex = lastSnapshotIndex;
flushIndex.setUnconditionally(lastSnapshotIndex, infoIndexChange);
safeCacheEvictIndex.setUnconditionally(lastSnapshotIndex, infoIndexChange);
pendingFlushNum = 0;
}
@Override
public String toString() {
return name;
}
/**
* This is protected by the RaftServer and RaftLog's lock.
*/
private Task addIOTask(Task task) {
LOG.debug("{} adds IO task {}", name, task);
try(UncheckedAutoCloseable ignored = raftLogMetrics.startQueuingDelayTimer()) {
for(; !queue.offer(task, ONE_SECOND); ) {
Preconditions.assertTrue(isAlive(),
"the worker thread is not alive");
}
} catch (Exception e) {
if (e instanceof InterruptedException && !running) {
LOG.info("Got InterruptedException when adding task " + task
+ ". The SegmentedRaftLogWorker already stopped.");
Thread.currentThread().interrupt();
} else {
LOG.error("Failed to add IO task {}", task, e);
Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
}
}
task.startTimerOnEnqueue(raftLogMetrics.getEnqueuedTimer());
return task;
}
boolean isAlive() {
return running && !workerThreadExecutor.isTerminated();
}
private void run() {
// if and when a log task encounters an exception
RaftLogIOException logIOException = null;
while (running) {
try {
Task task = queue.poll(ONE_SECOND);
if (task != null) {
task.stopTimerOnDequeue();
try {
if (logIOException != null) {
throw logIOException;
} else {
try (UncheckedAutoCloseable ignored = raftLogMetrics.startTaskExecutionTimer(task.getClass())) {
task.execute();
}
}
} catch (IOException e) {
if (task.getEndIndex() < lastWrittenIndex) {
LOG.info("Ignore IOException when handling task " + task
+ " which is smaller than the lastWrittenIndex."
+ " There should be a snapshot installed.", e);
} else {
task.failed(e);
if (logIOException == null) {
logIOException = new RaftLogIOException("Log already failed"
+ " at index " + task.getEndIndex()
+ " for task " + task, e);
}
continue;
}
}
task.done();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
LOG.warn("{} got interrupted while still running",
Thread.currentThread().getName());
}
LOG.info(Thread.currentThread().getName()
+ " was interrupted, exiting. There are " + queue.getNumElements()
+ " tasks remaining in the queue.");
return;
} catch (Exception e) {
if (!running) {
LOG.info("{} got closed and hit exception",
Thread.currentThread().getName(), e);
} else {
LOG.error("{} hit exception", Thread.currentThread().getName(), e);
Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
}
}
}
}
private boolean shouldFlush() {
if (out == null) {
return false;
} else if (pendingFlushNum >= forceSyncNum) {
return true;
}
return pendingFlushNum > 0 && queue.isEmpty();
}
private void flushIfNecessary() throws IOException {
if (shouldFlush()) {
raftLogMetrics.onRaftLogFlush();
LOG.debug("{}: flush {}", name, out);
try(UncheckedAutoCloseable ignored = raftLogMetrics.startFlushTimer()) {
final CompletableFuture<Void> f = stateMachine != null ?
stateMachine.data().flush(lastWrittenIndex) :
CompletableFuture.completedFuture(null);
if (stateMachineDataPolicy.isSync()) {
stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
}
flushBatchSize = (int)(lastWrittenIndex - flushIndex.get());
if (unsafeFlush) {
// unsafe-flush: call updateFlushedIndexIncreasingly() without waiting the underlying FileChannel.force(..).
unsafeFlushOutStream();
updateFlushedIndexIncreasingly();
} else if (asyncFlush) {
asyncFlushOutStream(f);
} else {
flushOutStream();
if (!stateMachineDataPolicy.isSync()) {
IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
}
updateFlushedIndexIncreasingly();
}
}
}
}
private void unsafeFlushOutStream() throws IOException {
final Timekeeper.Context logSyncTimerContext = raftLogMetrics.getSyncTimer().time();
out.asyncFlush(flushExecutor).whenComplete((v, e) -> logSyncTimerContext.stop());
}
private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) throws IOException {
final Timekeeper.Context logSyncTimerContext = raftLogMetrics.getSyncTimer().time();
out.asyncFlush(flushExecutor)
.thenCombine(stateMachineFlush, (async, sm) -> async)
.whenComplete((v, e) -> {
updateFlushedIndexIncreasingly(lastWrittenIndex);
logSyncTimerContext.stop();
});
}
private void flushOutStream() throws IOException {
try(UncheckedAutoCloseable ignored = Timekeeper.start(raftLogMetrics.getSyncTimer())) {
out.flush();
}
}
private void updateFlushedIndexIncreasingly() {
updateFlushedIndexIncreasingly(lastWrittenIndex);
}
private void updateFlushedIndexIncreasingly(long index) {
flushIndex.updateIncreasingly(index, traceIndexChange);
postUpdateFlushedIndex(Math.toIntExact(lastWrittenIndex - index));
writeTasks.updateIndex(index);
}
private void postUpdateFlushedIndex(int count) {
pendingFlushNum = count;
Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
}
/**
* The following several methods (startLogSegment, rollLogSegment,
* writeLogEntry, and truncate) are only called by SegmentedRaftLog which is
* protected by RaftServer's lock.
* Thus all the tasks are created and added sequentially.
*/
void startLogSegment(long startIndex) {
LOG.info("{}: Starting segment from index:{}", name, startIndex);
addIOTask(new StartLogSegment(startIndex));
}
void rollLogSegment(LogSegment segmentToClose) {
LOG.info("{}: Rolling segment {} to index:{}", name,
segmentToClose.toString(), segmentToClose.getEndIndex());
addIOTask(new FinalizeLogSegment(segmentToClose));
addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
}
Task writeLogEntry(LogEntryProto entry, LogEntryProto removedStateMachineData, TransactionContext context) {
return addIOTask(new WriteLog(entry, removedStateMachineData, context));
}
Task truncate(TruncationSegments ts, long index) {
LOG.info("{}: Truncating segments {}, start index {}", name, ts, index);
return addIOTask(new TruncateLog(ts, index));
}
void closeLogSegment(LogSegment segmentToClose) {
LOG.info("{}: Closing segment {} to index: {}", name,
segmentToClose.toString(), segmentToClose.getEndIndex());
addIOTask(new FinalizeLogSegment(segmentToClose));
}
Task purge(TruncationSegments ts) {
return addIOTask(new PurgeLog(ts));
}
private final class PurgeLog extends Task {
private final TruncationSegments segments;
private PurgeLog(TruncationSegments segments) {
this.segments = segments;
}
@Override
void execute() throws IOException {
if (segments.getToDelete() != null) {
try(UncheckedAutoCloseable ignored = raftLogMetrics.startPurgeTimer()) {
for (SegmentFileInfo fileInfo : segments.getToDelete()) {
FileUtils.deleteFile(fileInfo.getFile(storage));
}
}
}
}
@Override
long getEndIndex() {
return segments.maxEndIndex();
}
}
private class WriteLog extends Task {
private final LogEntryProto entry;
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData, TransactionContext context) {
this.entry = removedStateMachineData;
if (this.entry == entry) {
final StateMachineLogEntryProto proto = entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
if (stateMachine != null && proto != null && proto.getType() == StateMachineLogEntryProto.Type.DATASTREAM) {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(proto);
final CompletableFuture<DataStream> removed = server.getDataStreamMap().remove(invocationId);
this.stateMachineFuture = removed == null? stateMachine.data().link(null, entry)
: removed.thenApply(stream -> stateMachine.data().link(stream, entry));
} else {
this.stateMachineFuture = null;
}
} else {
try {
// this.entry != entry iff the entry has state machine data
this.stateMachineFuture = stateMachine.data().write(entry, context);
} catch (Exception e) {
LOG.error(name + ": writeStateMachineData failed for index " + entry.getIndex()
+ ", entry=" + LogProtoUtils.toLogEntryString(entry, stateMachine::toStateMachineLogEntryString), e);
throw e;
}
}
this.combined = stateMachineFuture == null? super.getFuture()
: super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index);
}
@Override
void failed(IOException e) {
stateMachine.event().notifyLogFailed(e, entry);
super.failed(e);
}
@Override
int getSerializedSize() {
return LogProtoUtils.getSerializedSize(entry);
}
@Override
CompletableFuture<Long> getFuture() {
return combined;
}
@Override
void done() {
writeTasks.offerOrCompleteFuture(this);
}
@Override
public void execute() throws IOException {
if (stateMachineDataPolicy.isSync() && stateMachineFuture != null) {
stateMachineDataPolicy.getFromFuture(stateMachineFuture, () -> this + "-writeStateMachineData");
}
raftLogMetrics.onRaftLogAppendEntry();
Preconditions.assertTrue(out != null);
Preconditions.assertTrue(lastWrittenIndex + 1 == entry.getIndex(),
"lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry);
out.write(entry);
lastWrittenIndex = entry.getIndex();
pendingFlushNum++;
flushIfNecessary();
}
@Override
long getEndIndex() {
return entry.getIndex();
}
@Override
public String toString() {
return super.toString() + ": " + LogProtoUtils.toLogEntryString(
entry, stateMachine == null? null: stateMachine::toStateMachineLogEntryString);
}
}
File getFile(long startIndex, Long endIndex) {
return LogSegmentStartEnd.valueOf(startIndex, endIndex).getFile(storage);
}
private class FinalizeLogSegment extends Task {
private final long startIndex;
private final long endIndex;
FinalizeLogSegment(LogSegment segmentToClose) {
Preconditions.assertTrue(segmentToClose != null, "Log segment to be rolled is null");
this.startIndex = segmentToClose.getStartIndex();
this.endIndex = segmentToClose.getEndIndex();
}
@Override
public void execute() throws IOException {
freeSegmentedRaftLogOutputStream();
final File openFile = getFile(startIndex, null);
Preconditions.assertTrue(openFile.exists(),
() -> name + ": File " + openFile + " to be rolled does not exist");
if (endIndex - startIndex + 1 > 0) {
// finalize the current open segment
final File dstFile = getFile(startIndex, endIndex);
Preconditions.assertTrue(!dstFile.exists());
FileUtils.move(openFile, dstFile);
LOG.info("{}: Rolled log segment from {} to {}", name, openFile, dstFile);
} else { // delete the file of the empty segment
FileUtils.deleteFile(openFile);
LOG.info("{}: Deleted empty log segment {}", name, openFile);
}
updateFlushedIndexIncreasingly();
safeCacheEvictIndex.updateToMax(endIndex, traceIndexChange);
}
@Override
void failed(IOException e) {
// not failed for a specific log entry, but an entire segment
stateMachine.event().notifyLogFailed(e, null);
super.failed(e);
}
@Override
long getEndIndex() {
return endIndex;
}
@Override
public String toString() {
return super.toString() + ": " + "startIndex=" + startIndex + " endIndex=" + endIndex;
}
}
private class StartLogSegment extends Task {
private final long newStartIndex;
StartLogSegment(long newStartIndex) {
this.newStartIndex = newStartIndex;
}
@Override
void execute() throws IOException {
final File openFile = getFile(newStartIndex, null);
Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s",
openFile, name);
Preconditions.assertTrue(pendingFlushNum == 0);
allocateSegmentedRaftLogOutputStream(openFile, false);
Preconditions.assertTrue(openFile.exists(), "Failed to create file %s for %s",
openFile.getAbsolutePath(), name);
LOG.info("{}: created new log segment {}", name, openFile);
}
@Override
long getEndIndex() {
return newStartIndex;
}
}
private class TruncateLog extends Task {
private final TruncationSegments segments;
private CompletableFuture<Void> stateMachineFuture = null;
TruncateLog(TruncationSegments ts, long index) {
this.segments = ts;
if (stateMachine != null) {
// TruncateLog and WriteLog instance is created while taking a RaftLog write lock.
// StateMachine call is made inside the constructor so that it is lock
// protected. This is to make sure that stateMachine can determine which
// indexes to truncate as stateMachine calls would happen in the sequence
// of log operations.
stateMachineFuture = stateMachine.data().truncate(index);
}
}
@Override
void execute() throws IOException {
freeSegmentedRaftLogOutputStream();
if (segments.getToDelete() != null && segments.getToDelete().length > 0) {
long minStart = segments.getToDelete()[0].getStartIndex();
for (SegmentFileInfo del : segments.getToDelete()) {
final File delFile = del.getFile(storage);
Preconditions.assertTrue(delFile.exists(),
"File %s to be deleted does not exist", delFile);
FileUtils.deleteFile(delFile);
LOG.info("{}: Deleted log file {}", name, delFile);
minStart = Math.min(minStart, del.getStartIndex());
}
if (segments.getToTruncate() == null) {
lastWrittenIndex = minStart - 1;
}
}
if (segments.getToTruncate() != null) {
final File fileToTruncate = segments.getToTruncate().getFile(storage);
Preconditions.assertTrue(fileToTruncate.exists(),
"File %s to be truncated does not exist", fileToTruncate);
FileUtils.truncateFile(fileToTruncate, segments.getToTruncate().getTargetLength());
// rename the file
final File dstFile = segments.getToTruncate().getNewFile(storage);
Preconditions.assertTrue(!dstFile.exists(),
"Truncated file %s already exists ", dstFile);
FileUtils.move(fileToTruncate, dstFile);
LOG.info("{}: Truncated log file {} to length {} and moved it to {}", name,
fileToTruncate, segments.getToTruncate().getTargetLength(), dstFile);
// update lastWrittenIndex
lastWrittenIndex = segments.getToTruncate().getNewEndIndex();
}
if (stateMachineFuture != null) {
IOUtils.getFromFuture(stateMachineFuture, () -> this + "-truncateStateMachineData");
}
flushIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
safeCacheEvictIndex.setUnconditionally(lastWrittenIndex, infoIndexChange);
postUpdateFlushedIndex(0);
}
@Override
long getEndIndex() {
if (segments.getToTruncate() != null) {
return segments.getToTruncate().getNewEndIndex();
} else if (segments.getToDelete().length > 0) {
return segments.getToDelete()[segments.getToDelete().length - 1].getEndIndex();
}
return RaftLog.INVALID_LOG_INDEX;
}
@Override
public String toString() {
return super.toString() + ": " + segments;
}
}
long getFlushIndex() {
return flushIndex.get();
}
long getSafeCacheEvictIndex() {
return safeCacheEvictIndex.get();
}
private void freeSegmentedRaftLogOutputStream() {
IOUtils.cleanup(LOG, out);
out = null;
Preconditions.assertTrue(writeBuffer.position() == 0);
}
private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
Preconditions.assertNull(out, "out");
Preconditions.assertSame(0, writeBuffer.position(), "writeBuffer.position()");
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
preallocatedSize, writeBuffer);
}
}