blob: 16f2b3a711cdd1ea6ce6426bdc665d11b7400c40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.storage.impl;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.disruptor.GroupAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.error.LogEntryCorruptedException;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.LogManagerOptions;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.util.ArrayDeque;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.SegmentList;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.Utils;
/**
* LogManager implementation.
*/
public class LogManagerImpl implements LogManager {
private static final int APPEND_LOG_RETRY_TIMES = 50;
private static final IgniteLogger LOG = IgniteLogger.forClass(LogManagerImpl.class);
/** Raft group id. */
String groupId;
private LogStorage logStorage;
private ConfigurationManager configManager;
private FSMCaller fsmCaller;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock writeLock = this.lock.writeLock();
private final Lock readLock = this.lock.readLock();
private volatile boolean stopped;
private volatile boolean hasError;
private long nextWaitId;
private LogId diskId = new LogId(0, 0); // Last log entry written to disk.
private LogId appliedId = new LogId(0, 0);
private final SegmentList<LogEntry> logsInMemory = new SegmentList<>(true);
private volatile long firstLogIndex;
private volatile long lastLogIndex;
private volatile LogId lastSnapshotId = new LogId(0, 0);
private final Map<Long, WaitMeta> waitMap = new HashMap<>();
private StripedDisruptor<StableClosureEvent> disruptor;
private RingBuffer<StableClosureEvent> diskQueue;
private RaftOptions raftOptions;
private volatile CountDownLatch shutDownLatch;
private NodeMetrics nodeMetrics;
private final CopyOnWriteArrayList<LastLogIndexListener> lastLogIndexListeners = new CopyOnWriteArrayList<>();
private NodeOptions nodeOptions;
private enum EventType {
OTHER, // other event type.
RESET, // reset
TRUNCATE_PREFIX, // truncate log from prefix
TRUNCATE_SUFFIX, // truncate log from suffix
SHUTDOWN, //
LAST_LOG_ID // get last log id
}
public static class StableClosureEvent implements GroupAware {
/** Raft group id. */
String groupId;
StableClosure done;
EventType type;
/** {@inheritDoc} */
@Override public String groupId() {
return groupId;
}
void reset() {
this.groupId = null;
this.done = null;
this.type = null;
}
}
/**
* Waiter metadata
*/
private static class WaitMeta {
/**
* callback when new log come in
*/
NewLogCallback onNewLog;
/**
* callback error code
*/
int errorCode;
/**
* the waiter pass-in argument
*/
Object arg;
WaitMeta(final NewLogCallback onNewLog, final Object arg, final int errorCode) {
super();
this.onNewLog = onNewLog;
this.arg = arg;
this.errorCode = errorCode;
}
}
@Override
public void addLastLogIndexListener(final LastLogIndexListener listener) {
this.lastLogIndexListeners.add(listener);
}
@Override
public void removeLastLogIndexListener(final LastLogIndexListener listener) {
this.lastLogIndexListeners.remove(listener);
}
@Override
public boolean init(final LogManagerOptions opts) {
this.writeLock.lock();
try {
if (opts.getLogStorage() == null) {
LOG.error("Fail to init log manager, log storage is null");
return false;
}
this.raftOptions = opts.getRaftOptions();
this.nodeMetrics = opts.getNodeMetrics();
this.logStorage = opts.getLogStorage();
this.configManager = opts.getConfigurationManager();
this.nodeOptions = opts.getNode().getOptions();
this.groupId = opts.getGroupId();
LogStorageOptions lsOpts = new LogStorageOptions();
lsOpts.setConfigurationManager(this.configManager);
lsOpts.setLogEntryCodecFactory(opts.getLogEntryCodecFactory());
if (!this.logStorage.init(lsOpts)) {
LOG.error("Fail to init logStorage");
return false;
}
this.firstLogIndex = this.logStorage.getFirstLogIndex();
this.lastLogIndex = this.logStorage.getLastLogIndex();
this.diskId = new LogId(this.lastLogIndex, getTermFromLogStorage(this.lastLogIndex));
this.fsmCaller = opts.getFsmCaller();
this.disruptor = opts.getLogManagerDisruptor();
this.diskQueue = disruptor.subscribe(groupId, new StableClosureEventHandler(),
(event, ex) -> reportError(-1, "LogManager handle event error"));
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry().register("jraft-log-manager-disruptor",
new DisruptorMetricSet(this.diskQueue));
}
}
finally {
this.writeLock.unlock();
}
return true;
}
private void stopDiskThread() {
this.shutDownLatch = new CountDownLatch(1);
Utils.runInThread(nodeOptions.getCommonExecutor(), () -> this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
event.groupId = groupId;
event.type = EventType.SHUTDOWN;
}));
}
@Override
public void join() throws InterruptedException {
if (this.shutDownLatch == null) {
return;
}
this.shutDownLatch.await();
this.disruptor.unsubscribe(groupId);
}
@Override
public void shutdown() {
boolean doUnlock = true;
this.writeLock.lock();
try {
if (this.stopped) {
return;
}
this.stopped = true;
doUnlock = false;
wakeupAllWaiter(this.writeLock);
}
finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
stopDiskThread();
}
private void clearMemoryLogs(final LogId id) {
this.writeLock.lock();
try {
this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().compareTo(id) <= 0);
}
finally {
this.writeLock.unlock();
}
}
private static class LastLogIdClosure extends StableClosure {
LastLogIdClosure() {
super(null);
}
private LogId lastLogId;
void setLastLogId(final LogId logId) {
Requires.requireTrue(logId.getIndex() == 0 || logId.getTerm() != 0);
this.lastLogId = logId;
}
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run(final Status status) {
this.latch.countDown();
}
void await() throws InterruptedException {
this.latch.await();
}
}
@Override
public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
Requires.requireNonNull(done, "done");
if (this.hasError) {
entries.clear();
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.EIO, "Corrupted LogStorage"));
return;
}
boolean doUnlock = true;
this.writeLock.lock();
try {
if (!entries.isEmpty() && !checkAndResolveConflict(entries, done)) {
// If checkAndResolveConflict returns false, the done will be called in it.
entries.clear();
return;
}
for (int i = 0; i < entries.size(); i++) {
final LogEntry entry = entries.get(i);
// Set checksum after checkAndResolveConflict
if (this.raftOptions.isEnableLogEntryChecksum()) {
entry.setChecksum(entry.checksum());
}
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
Configuration oldConf = new Configuration();
if (entry.getOldPeers() != null) {
oldConf = new Configuration(entry.getOldPeers(), entry.getOldLearners());
}
final ConfigurationEntry conf = new ConfigurationEntry(entry.getId(),
new Configuration(entry.getPeers(), entry.getLearners()), oldConf);
this.configManager.add(conf);
}
}
if (!entries.isEmpty()) {
done.setFirstLogIndex(entries.get(0).getId().getIndex());
this.logsInMemory.addAll(entries);
}
done.setEntries(entries);
int retryTimes = 0;
final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
event.reset();
event.groupId = groupId;
event.type = EventType.OTHER;
event.done = done;
};
while (true) {
if (tryOfferEvent(done, translator)) {
break;
}
else {
retryTimes++;
if (retryTimes > APPEND_LOG_RETRY_TIMES) {
reportError(RaftError.EBUSY.getNumber(), "LogManager is busy, disk queue overload.");
return;
}
ThreadHelper.onSpinWait();
}
}
doUnlock = false;
if (!wakeupAllWaiter(this.writeLock)) {
notifyLastLogIndexListeners();
}
}
finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
private void offerEvent(final StableClosure done, final EventType type) {
if (this.stopped) {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
return;
}
if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
event.reset();
event.groupId = groupId;
event.type = type;
event.done = done;
})) {
reportError(RaftError.EBUSY.getNumber(), "Log manager is overload.");
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.EBUSY, "Log manager is overload."));
}
}
private boolean tryOfferEvent(final StableClosure done, final EventTranslator<StableClosureEvent> translator) {
if (this.stopped) {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
return true;
}
return this.diskQueue.tryPublishEvent(translator);
}
private void notifyLastLogIndexListeners() {
for (LastLogIndexListener listener : lastLogIndexListeners) {
if (listener != null) {
try {
listener.onLastLogIndexChanged(this.lastLogIndex);
}
catch (final Exception e) {
LOG.error("Fail to notify LastLogIndexListener, listener={}, index={}", listener, this.lastLogIndex);
}
}
}
}
private boolean wakeupAllWaiter(final Lock lock) {
if (this.waitMap.isEmpty()) {
lock.unlock();
return false;
}
final List<WaitMeta> wms = new ArrayList<>(this.waitMap.values());
final int errCode = this.stopped ? RaftError.ESTOP.getNumber() : RaftError.SUCCESS.getNumber();
this.waitMap.clear();
lock.unlock();
final int waiterCount = wms.size();
for (int i = 0; i < waiterCount; i++) {
final WaitMeta wm = wms.get(i);
wm.errorCode = errCode;
Utils.runInThread(nodeOptions.getCommonExecutor(), () -> runOnNewLog(wm));
}
return true;
}
private LogId appendToStorage(final List<LogEntry> toAppend) {
LogId lastId = null;
if (!this.hasError) {
final long startMs = Utils.monotonicMs();
final int entriesCount = toAppend.size();
this.nodeMetrics.recordSize("append-logs-count", entriesCount);
try {
int writtenSize = 0;
for (int i = 0; i < entriesCount; i++) {
final LogEntry entry = toAppend.get(i);
writtenSize += entry.getData() != null ? entry.getData().remaining() : 0;
}
this.nodeMetrics.recordSize("append-logs-bytes", writtenSize);
final int nAppent = this.logStorage.appendEntries(toAppend);
if (nAppent != entriesCount) {
LOG.error("**Critical error**, fail to appendEntries, nAppent={}, toAppend={}", nAppent,
toAppend.size());
reportError(RaftError.EIO.getNumber(), "Fail to append log entries");
}
if (nAppent > 0) {
lastId = toAppend.get(nAppent - 1).getId();
}
toAppend.clear();
}
finally {
this.nodeMetrics.recordLatency("append-logs", Utils.monotonicMs() - startMs);
}
}
return lastId;
}
private class AppendBatcher {
List<StableClosure> storage;
int cap;
int size;
int bufferSize;
List<LogEntry> toAppend;
LogId lastId;
AppendBatcher(final List<StableClosure> storage, final int cap, final List<LogEntry> toAppend,
final LogId lastId) {
super();
this.storage = storage;
this.cap = cap;
this.toAppend = toAppend;
this.lastId = lastId;
}
LogId flush() {
if (this.size > 0) {
this.lastId = appendToStorage(this.toAppend);
for (int i = 0; i < this.size; i++) {
this.storage.get(i).getEntries().clear();
Status st = null;
try {
if (LogManagerImpl.this.hasError) {
st = new Status(RaftError.EIO, "Corrupted LogStorage");
}
else {
st = Status.OK();
}
this.storage.get(i).run(st);
}
catch (Throwable t) {
LOG.error("Fail to run closure with status: {}.", t, st);
}
}
this.toAppend.clear();
this.storage.clear();
}
this.size = 0;
this.bufferSize = 0;
return this.lastId;
}
void append(final StableClosure done) {
if (this.size == this.cap || this.bufferSize >= LogManagerImpl.this.raftOptions.getMaxAppendBufferSize()) {
flush();
}
this.storage.add(done);
this.size++;
this.toAppend.addAll(done.getEntries());
for (final LogEntry entry : done.getEntries()) {
this.bufferSize += entry.getData() != null ? entry.getData().remaining() : 0;
}
}
}
private class StableClosureEventHandler implements EventHandler<StableClosureEvent> {
LogId lastId = LogManagerImpl.this.diskId;
List<StableClosure> storage = new ArrayList<>(256);
AppendBatcher ab = new AppendBatcher(this.storage, 256, new ArrayList<>(), LogManagerImpl.this.diskId);
@Override
public void onEvent(final StableClosureEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
if (event.type == EventType.SHUTDOWN) {
this.lastId = this.ab.flush();
setDiskId(this.lastId);
LogManagerImpl.this.shutDownLatch.countDown();
return;
}
final StableClosure done = event.done;
if (done.getEntries() != null && !done.getEntries().isEmpty()) {
this.ab.append(done);
}
else {
this.lastId = this.ab.flush();
boolean ret = true;
switch (event.type) {
case LAST_LOG_ID:
((LastLogIdClosure) done).setLastLogId(this.lastId.copy());
break;
case TRUNCATE_PREFIX:
long startMs = Utils.monotonicMs();
try {
final TruncatePrefixClosure tpc = (TruncatePrefixClosure) done;
LOG.debug("Truncating storage to firstIndexKept={}.", tpc.firstIndexKept);
ret = LogManagerImpl.this.logStorage.truncatePrefix(tpc.firstIndexKept);
}
finally {
LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-prefix", Utils.monotonicMs()
- startMs);
}
break;
case TRUNCATE_SUFFIX:
startMs = Utils.monotonicMs();
try {
final TruncateSuffixClosure tsc = (TruncateSuffixClosure) done;
LOG.warn("Truncating storage to lastIndexKept={}.", tsc.lastIndexKept);
ret = LogManagerImpl.this.logStorage.truncateSuffix(tsc.lastIndexKept);
if (ret) {
this.lastId.setIndex(tsc.lastIndexKept);
this.lastId.setTerm(tsc.lastTermKept);
Requires.requireTrue(this.lastId.getIndex() == 0 || this.lastId.getTerm() != 0);
}
}
finally {
LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-suffix", Utils.monotonicMs()
- startMs);
}
break;
case RESET:
final ResetClosure rc = (ResetClosure) done;
LOG.info("Resetting storage to nextLogIndex={}.", rc.nextLogIndex);
ret = LogManagerImpl.this.logStorage.reset(rc.nextLogIndex);
break;
default:
break;
}
if (!ret) {
reportError(RaftError.EIO.getNumber(), "Failed operation in LogStorage");
}
else {
done.run(Status.OK());
}
}
if (endOfBatch) {
this.lastId = this.ab.flush();
setDiskId(this.lastId);
}
}
}
private void reportError(final int code, final String fmt, final Object... args) {
this.hasError = true;
final RaftException error = new RaftException(ErrorType.ERROR_TYPE_LOG);
error.setStatus(new Status(code, fmt, args));
this.fsmCaller.onError(error);
}
private void setDiskId(final LogId id) {
if (id == null) {
return;
}
LogId clearId;
this.writeLock.lock();
try {
if (id.compareTo(this.diskId) < 0) {
return;
}
this.diskId = id;
clearId = this.diskId.compareTo(this.appliedId) <= 0 ? this.diskId : this.appliedId;
}
finally {
this.writeLock.unlock();
}
if (clearId != null) {
clearMemoryLogs(clearId);
}
}
@Override
public void setSnapshot(final SnapshotMeta meta) {
LOG.debug("set snapshot: {}.", meta);
this.writeLock.lock();
try {
if (meta.lastIncludedIndex() <= this.lastSnapshotId.getIndex()) {
return;
}
final Configuration conf = confFromMeta(meta);
final Configuration oldConf = oldConfFromMeta(meta);
final ConfigurationEntry entry = new ConfigurationEntry(new LogId(meta.lastIncludedIndex(),
meta.lastIncludedTerm()), conf, oldConf);
this.configManager.setSnapshot(entry);
final long term = unsafeGetTerm(meta.lastIncludedIndex());
final long savedLastSnapshotIndex = this.lastSnapshotId.getIndex();
this.lastSnapshotId.setIndex(meta.lastIncludedIndex());
this.lastSnapshotId.setTerm(meta.lastIncludedTerm());
if (this.lastSnapshotId.compareTo(this.appliedId) > 0) {
this.appliedId = this.lastSnapshotId.copy();
}
// NOTICE: not to update disk_id here as we are not sure if this node really
// has these logs on disk storage. Just leave disk_id as it was, which can keep
// these logs in memory all the time until they are flushed to disk. By this
// way we can avoid some corner cases which failed to get logs.
// See https://github.com/baidu/braft/pull/224/commits/8ef6fdbf70d23f5a4ee147356a889e2c0fa22aac
// if (this.lastSnapshotId.compareTo(this.diskId) > 0) {
// this.diskId = this.lastSnapshotId.copy();
// }
if (term == 0) {
// last_included_index is larger than last_index
// FIXME: what if last_included_index is less than first_index?
truncatePrefix(meta.lastIncludedIndex() + 1);
}
else if (term == meta.lastIncludedTerm()) {
// Truncating log to the index of the last snapshot.
// We don't truncate log before the last snapshot immediately since
// some log around last_snapshot_index is probably needed by some
// followers
// TODO if there are still be need? TODO asch
if (savedLastSnapshotIndex > 0) {
truncatePrefix(savedLastSnapshotIndex + 1);
}
}
else {
if (!reset(meta.lastIncludedIndex() + 1)) {
LOG.warn("Reset log manager failed, nextLogIndex={}.", meta.lastIncludedIndex() + 1);
}
}
}
finally {
this.writeLock.unlock();
}
}
private Configuration oldConfFromMeta(final SnapshotMeta meta) {
final Configuration oldConf = new Configuration();
if (meta.oldPeersList() != null) {
for (String oldPeer : meta.oldPeersList()) {
final PeerId peer = new PeerId();
peer.parse(oldPeer);
oldConf.addPeer(peer);
}
}
if (meta.oldLearnersList() != null) {
for (String oldLearner : meta.oldLearnersList()) {
final PeerId peer = new PeerId();
peer.parse(oldLearner);
oldConf.addLearner(peer);
}
}
return oldConf;
}
private Configuration confFromMeta(final SnapshotMeta meta) {
final Configuration conf = new Configuration();
if (meta.peersList() != null) {
for (String metaPeer : meta.peersList()) {
final PeerId peer = new PeerId();
peer.parse(metaPeer);
conf.addPeer(peer);
}
}
if (meta.learnersList() != null) {
for (String learner : meta.learnersList()) {
final PeerId peer = new PeerId();
peer.parse(learner);
conf.addLearner(peer);
}
}
return conf;
}
@Override
public void clearBufferedLogs() {
this.writeLock.lock();
try {
if (this.lastSnapshotId.getIndex() != 0) {
truncatePrefix(this.lastSnapshotId.getIndex() + 1);
}
}
finally {
this.writeLock.unlock();
}
}
private String descLogsInMemory() {
final StringBuilder sb = new StringBuilder();
boolean wasFirst = true;
for (int i = 0; i < this.logsInMemory.size(); i++) {
LogEntry logEntry = this.logsInMemory.get(i);
if (!wasFirst) {
sb.append(",");
}
else {
wasFirst = false;
}
sb.append("<id:(").append(logEntry.getId().getTerm()).append(",").append(logEntry.getId().getIndex())
.append("),type:").append(logEntry.getType()).append(">");
}
return sb.toString();
}
protected LogEntry getEntryFromMemory(final long index) {
LogEntry entry = null;
if (!this.logsInMemory.isEmpty()) {
final long firstIndex = this.logsInMemory.peekFirst().getId().getIndex();
final long lastIndex = this.logsInMemory.peekLast().getId().getIndex();
if (lastIndex - firstIndex + 1 != this.logsInMemory.size()) {
throw new IllegalStateException(String.format("lastIndex=%d,firstIndex=%d,logsInMemory=[%s]",
lastIndex, firstIndex, descLogsInMemory()));
}
if (index >= firstIndex && index <= lastIndex) {
entry = this.logsInMemory.get((int) (index - firstIndex));
}
}
return entry;
}
@Override
public LogEntry getEntry(final long index) {
this.readLock.lock();
try {
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return null;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry;
}
}
finally {
this.readLock.unlock();
}
final LogEntry entry = this.logStorage.getEntry(index);
if (entry == null) {
reportError(RaftError.EIO.getNumber(), "Corrupted entry at index=%d, not found", index);
}
// Validate checksum
if (entry != null && this.raftOptions.isEnableLogEntryChecksum() && entry.isCorrupted()) {
String msg = String.format("Corrupted entry at index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
index, entry.getId().getTerm(), entry.getChecksum(), entry.checksum());
// Report error to node and throw exception.
reportError(RaftError.EIO.getNumber(), msg);
throw new LogEntryCorruptedException(msg);
}
return entry;
}
@Override
public long getTerm(final long index) {
if (index == 0) {
return 0;
}
this.readLock.lock();
try {
// check index equal snapshot_index, return snapshot_term
if (index == this.lastSnapshotId.getIndex()) {
return this.lastSnapshotId.getTerm();
}
// out of range, direct return 0
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return 0;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry.getId().getTerm();
}
}
finally {
this.readLock.unlock();
}
return getTermFromLogStorage(index);
}
private long getTermFromLogStorage(final long index) {
final LogEntry entry = this.logStorage.getEntry(index);
if (entry != null) {
if (this.raftOptions.isEnableLogEntryChecksum() && entry.isCorrupted()) {
// Report error to node and throw exception.
final String msg = String.format(
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d", entry
.getId().getIndex(), entry.getId().getTerm(), entry.getChecksum(), entry.checksum());
reportError(RaftError.EIO.getNumber(), msg);
throw new LogEntryCorruptedException(msg);
}
return entry.getId().getTerm();
}
return 0;
}
@Override
public long getFirstLogIndex() {
this.readLock.lock();
try {
return this.firstLogIndex;
}
finally {
this.readLock.unlock();
}
}
@Override
public long getLastLogIndex() {
return getLastLogIndex(false);
}
@Override
public long getLastLogIndex(final boolean isFlush) {
LastLogIdClosure c;
this.readLock.lock();
try {
if (!isFlush) {
return this.lastLogIndex;
}
else {
if (this.lastLogIndex == this.lastSnapshotId.getIndex()) {
return this.lastLogIndex;
}
c = new LastLogIdClosure();
offerEvent(c, EventType.LAST_LOG_ID);
}
}
finally {
this.readLock.unlock();
}
try {
c.await();
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
return c.lastLogId.getIndex();
}
private long unsafeGetTerm(final long index) {
if (index == 0) {
return 0;
}
final LogId lss = this.lastSnapshotId;
if (index == lss.getIndex()) {
return lss.getTerm();
}
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return 0;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry.getId().getTerm();
}
return getTermFromLogStorage(index);
}
@Override
public LogId getLastLogId(final boolean isFlush) {
LastLogIdClosure c;
this.readLock.lock();
try {
if (!isFlush) {
if (this.lastLogIndex >= this.firstLogIndex) {
return new LogId(this.lastLogIndex, unsafeGetTerm(this.lastLogIndex));
}
return this.lastSnapshotId;
}
else {
if (this.lastLogIndex == this.lastSnapshotId.getIndex()) {
return this.lastSnapshotId;
}
c = new LastLogIdClosure();
offerEvent(c, EventType.LAST_LOG_ID);
}
}
finally {
this.readLock.unlock();
}
try {
c.await();
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
return c.lastLogId;
}
private static class TruncatePrefixClosure extends StableClosure {
long firstIndexKept;
TruncatePrefixClosure(final long firstIndexKept) {
super(null);
this.firstIndexKept = firstIndexKept;
}
@Override
public void run(final Status status) {
}
}
private static class TruncateSuffixClosure extends StableClosure {
long lastIndexKept;
long lastTermKept;
TruncateSuffixClosure(final long lastIndexKept, final long lastTermKept) {
super(null);
this.lastIndexKept = lastIndexKept;
this.lastTermKept = lastTermKept;
}
@Override
public void run(final Status status) {
}
}
private static class ResetClosure extends StableClosure {
long nextLogIndex;
ResetClosure(final long nextLogIndex) {
super(null);
this.nextLogIndex = nextLogIndex;
}
@Override
public void run(final Status status) {
}
}
private boolean truncatePrefix(final long firstIndexKept) {
this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);
// TODO maybe it's fine here
Requires.requireTrue(firstIndexKept >= this.firstLogIndex,
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);
this.firstLogIndex = firstIndexKept;
if (firstIndexKept > this.lastLogIndex) {
// The entry log is dropped
this.lastLogIndex = firstIndexKept - 1;
}
LOG.debug("Truncate prefix, firstIndexKept is :{}", firstIndexKept);
this.configManager.truncatePrefix(firstIndexKept);
final TruncatePrefixClosure c = new TruncatePrefixClosure(firstIndexKept);
offerEvent(c, EventType.TRUNCATE_PREFIX);
return true;
}
private boolean reset(final long nextLogIndex) {
this.writeLock.lock();
try {
this.logsInMemory.clear();
this.firstLogIndex = nextLogIndex;
this.lastLogIndex = nextLogIndex - 1;
this.configManager.truncatePrefix(this.firstLogIndex);
this.configManager.truncateSuffix(this.lastLogIndex);
final ResetClosure c = new ResetClosure(nextLogIndex);
offerEvent(c, EventType.RESET);
return true;
}
finally {
this.writeLock.unlock();
}
}
private void unsafeTruncateSuffix(final long lastIndexKept) {
if (lastIndexKept < this.appliedId.getIndex()) {
LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", this.appliedId,
lastIndexKept);
return;
}
this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);
this.lastLogIndex = lastIndexKept;
final long lastTermKept = unsafeGetTerm(lastIndexKept);
Requires.requireTrue(this.lastLogIndex == 0 || lastTermKept != 0);
LOG.debug("Truncate suffix :{}", lastIndexKept);
this.configManager.truncateSuffix(lastIndexKept);
final TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept);
offerEvent(c, EventType.TRUNCATE_SUFFIX);
}
@SuppressWarnings("NonAtomicOperationOnVolatileField")
private boolean checkAndResolveConflict(final List<LogEntry> entries, final StableClosure done) {
final LogEntry firstLogEntry = ArrayDeque.peekFirst(entries);
if (firstLogEntry.getId().getIndex() == 0) {
// Node is currently the leader and |entries| are from the user who
// don't know the correct indexes the logs should assign to. So we have
// to assign indexes to the appending entries
for (int i = 0; i < entries.size(); i++) {
entries.get(i).getId().setIndex(++this.lastLogIndex);
}
return true;
}
else {
// Node is currently a follower and |entries| are from the leader. We
// should check and resolve the conflicts between the local logs and
// |entries|
if (firstLogEntry.getId().getIndex() > this.lastLogIndex + 1) {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.EINVAL,
"There's gap between first_index=%d and last_log_index=%d", firstLogEntry.getId().getIndex(),
this.lastLogIndex));
return false;
}
final long appliedIndex = this.appliedId.getIndex();
final LogEntry lastLogEntry = ArrayDeque.peekLast(entries);
if (lastLogEntry.getId().getIndex() <= appliedIndex) {
LOG.warn(
"Received entries of which the lastLog={} is not greater than appliedIndex={}, return immediately with nothing changed.",
lastLogEntry.getId().getIndex(), appliedIndex);
// Replicate old logs before appliedIndex should be considered successfully, response OK.
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done);
return false;
}
if (firstLogEntry.getId().getIndex() == this.lastLogIndex + 1) {
// fast path
this.lastLogIndex = lastLogEntry.getId().getIndex();
}
else {
// Appending entries overlap the local ones. We should find if there
// is a conflicting index from which we should truncate the local
// ones.
int conflictingIndex = 0;
for (; conflictingIndex < entries.size(); conflictingIndex++) {
if (unsafeGetTerm(entries.get(conflictingIndex).getId().getIndex()) != entries
.get(conflictingIndex).getId().getTerm()) {
break;
}
}
if (conflictingIndex != entries.size()) {
if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
// Truncate all the conflicting entries to make local logs
// consensus with the leader.
unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1);
}
this.lastLogIndex = lastLogEntry.getId().getIndex();
} // else this is a duplicated AppendEntriesRequest, we have
// nothing to do besides releasing all the entries
if (conflictingIndex > 0) {
// Remove duplication
entries.subList(0, conflictingIndex).clear();
}
}
return true;
}
}
@Override
public ConfigurationEntry getConfiguration(final long index) {
this.readLock.lock();
try {
return this.configManager.get(index);
}
finally {
this.readLock.unlock();
}
}
@Override
public ConfigurationEntry checkAndSetConfiguration(final ConfigurationEntry current) {
if (current == null) {
return null;
}
this.readLock.lock();
try {
final ConfigurationEntry lastConf = this.configManager.getLastConfiguration();
if (lastConf != null && !lastConf.isEmpty() && !current.getId().equals(lastConf.getId())) {
return lastConf;
}
}
finally {
this.readLock.unlock();
}
return current;
}
@Override
public long wait(final long expectedLastLogIndex, final NewLogCallback cb, final Object arg) {
final WaitMeta wm = new WaitMeta(cb, arg, 0);
return notifyOnNewLog(expectedLastLogIndex, wm);
}
private long notifyOnNewLog(final long expectedLastLogIndex, final WaitMeta wm) {
this.writeLock.lock();
try {
if (expectedLastLogIndex != this.lastLogIndex || this.stopped) {
wm.errorCode = this.stopped ? RaftError.ESTOP.getNumber() : 0;
Utils.runInThread(nodeOptions.getCommonExecutor(), () -> runOnNewLog(wm));
return 0L;
}
if (this.nextWaitId == 0) { //skip 0
++this.nextWaitId;
}
final long waitId = this.nextWaitId++;
this.waitMap.put(waitId, wm);
return waitId;
}
finally {
this.writeLock.unlock();
}
}
@Override
public boolean removeWaiter(final long id) {
this.writeLock.lock();
try {
return this.waitMap.remove(id) != null;
}
finally {
this.writeLock.unlock();
}
}
@Override
public void setAppliedId(final LogId appliedId) {
LogId clearId;
this.writeLock.lock();
try {
if (appliedId.compareTo(this.appliedId) < 0) {
return;
}
this.appliedId = appliedId.copy();
clearId = this.diskId.compareTo(this.appliedId) <= 0 ? this.diskId : this.appliedId;
}
finally {
this.writeLock.unlock();
}
if (clearId != null) {
clearMemoryLogs(clearId);
}
}
void runOnNewLog(final WaitMeta wm) {
wm.onNewLog.onNewLog(wm.arg, wm.errorCode);
}
@Override
public Status checkConsistency() {
this.readLock.lock();
try {
Requires.requireTrue(this.firstLogIndex > 0);
Requires.requireTrue(this.lastLogIndex >= 0);
if (this.lastSnapshotId.equals(new LogId(0, 0))) {
if (this.firstLogIndex == 1) {
return Status.OK();
}
return new Status(RaftError.EIO, "Missing logs in (0, %d)", this.firstLogIndex);
}
else {
if (this.lastSnapshotId.getIndex() >= this.firstLogIndex - 1
&& this.lastSnapshotId.getIndex() <= this.lastLogIndex) {
return Status.OK();
}
return new Status(RaftError.EIO, "There's a gap between snapshot={%d, %d} and log=[%d, %d] ",
this.lastSnapshotId.toString(), this.lastSnapshotId.getTerm(), this.firstLogIndex,
this.lastLogIndex);
}
}
finally {
this.readLock.unlock();
}
}
@Override
public void describe(final Printer out) {
final long _firstLogIndex;
final long _lastLogIndex;
final String _diskId;
final String _appliedId;
final String _lastSnapshotId;
this.readLock.lock();
try {
_firstLogIndex = this.firstLogIndex;
_lastLogIndex = this.lastLogIndex;
_diskId = String.valueOf(this.diskId);
_appliedId = String.valueOf(this.appliedId);
_lastSnapshotId = String.valueOf(this.lastSnapshotId);
}
finally {
this.readLock.unlock();
}
out.print(" storage: [") //
.print(_firstLogIndex) //
.print(", ") //
.print(_lastLogIndex) //
.println(']');
out.print(" diskId: ") //
.println(_diskId);
out.print(" appliedId: ") //
.println(_appliedId);
out.print(" lastSnapshotId: ") //
.println(_lastSnapshotId);
}
}