blob: 4d932131dc47cf7f6dbdb162e458b11bf418f024 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.core;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.ClosureQueue;
import org.apache.ignite.raft.jraft.closure.LoadSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.disruptor.GroupAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.entity.SnapshotMetaBuilder;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.FSMCallerOptions;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.Utils;
import static java.util.stream.Collectors.toList;
/**
* The finite state machine caller implementation.
*/
public class FSMCallerImpl implements FSMCaller {
private static final IgniteLogger LOG = IgniteLogger.forClass(FSMCallerImpl.class);
/**
* Task type
*/
private enum TaskType {
IDLE, //
COMMITTED, //
SNAPSHOT_SAVE, //
SNAPSHOT_LOAD, //
LEADER_STOP, //
LEADER_START, //
START_FOLLOWING, //
STOP_FOLLOWING, //
SHUTDOWN, //
FLUSH, //
ERROR;
private String metricName;
public String metricName() {
if (this.metricName == null) {
this.metricName = "fsm-" + name().toLowerCase().replaceAll("_", "-");
}
return this.metricName;
}
}
/**
* Apply task for disruptor.
*/
public static class ApplyTask implements GroupAware {
/** Raft group id. */
String groupId;
TaskType type;
// union fields
long committedIndex;
long term;
Status status;
LeaderChangeContext leaderChangeCtx;
Closure done;
CountDownLatch shutdownLatch;
/** {@inheritDoc} */
@Override public String groupId() {
return groupId;
}
public void reset() {
this.type = null;
this.committedIndex = 0;
this.term = 0;
this.status = null;
this.leaderChangeCtx = null;
this.done = null;
this.shutdownLatch = null;
this.groupId = null;
}
}
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;
@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
}
/** Raft group id. */
String groupId;
private LogManager logManager;
private StateMachine fsm;
private ClosureQueue closureQueue;
private final AtomicLong lastAppliedIndex;
private long lastAppliedTerm;
private Closure afterShutdown;
private NodeImpl node;
private volatile TaskType currTask;
private final AtomicLong applyingIndex;
private volatile RaftException error;
private StripedDisruptor<ApplyTask> disruptor;
private RingBuffer<ApplyTask> taskQueue;
private volatile CountDownLatch shutdownLatch;
private NodeMetrics nodeMetrics;
private final CopyOnWriteArrayList<LastAppliedLogIndexListener> lastAppliedLogIndexListeners = new CopyOnWriteArrayList<>();
private RaftMessagesFactory msgFactory;
public FSMCallerImpl() {
super();
this.currTask = TaskType.IDLE;
this.lastAppliedIndex = new AtomicLong(0);
this.applyingIndex = new AtomicLong(0);
}
@Override
public boolean init(final FSMCallerOptions opts) {
this.groupId = opts.getGroupId();
this.logManager = opts.getLogManager();
this.fsm = opts.getFsm();
this.closureQueue = opts.getClosureQueue();
this.afterShutdown = opts.getAfterShutdown();
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
disruptor = opts.getfSMCallerExecutorDisruptor();
taskQueue = disruptor.subscribe(groupId, new ApplyTaskHandler());
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry().register("jraft-fsm-caller-disruptor",
new DisruptorMetricSet(this.taskQueue));
}
this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
this.msgFactory = opts.getRaftMessagesFactory();
LOG.info("Starts FSMCaller successfully.");
return true;
}
@Override
public synchronized void shutdown() {
if (this.shutdownLatch != null) {
return;
}
LOG.info("Shutting down FSMCaller...");
if (this.taskQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(this.node.getOptions().getCommonExecutor(), () -> this.taskQueue.publishEvent((task, sequence) -> {
task.reset();
task.groupId = groupId;
task.type = TaskType.SHUTDOWN;
task.shutdownLatch = latch;
}));
}
doShutdown();
}
@Override
public void addLastAppliedLogIndexListener(final LastAppliedLogIndexListener listener) {
this.lastAppliedLogIndexListeners.add(listener);
}
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
if (this.shutdownLatch != null) {
// Shutting down
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
if (!this.taskQueue.tryPublishEvent(tpl)) {
setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
"FSMCaller is overload.")));
return false;
}
return true;
}
@Override
public boolean onCommitted(final long committedIndex) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.COMMITTED;
task.committedIndex = committedIndex;
});
}
/**
* Flush all events in disruptor.
*/
@OnlyForTest
void flush() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.FLUSH;
task.shutdownLatch = latch;
});
latch.await();
}
@Override
public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.SNAPSHOT_LOAD;
task.done = done;
});
}
@Override
public boolean onSnapshotSave(final SaveSnapshotClosure done) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.SNAPSHOT_SAVE;
task.done = done;
});
}
@Override
public boolean onLeaderStop(final Status status) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
}
@Override
public boolean onLeaderStart(final long term) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.LEADER_START;
task.term = term;
});
}
@Override
public boolean onStartFollowing(final LeaderChangeContext ctx) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.START_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
});
}
@Override
public boolean onStopFollowing(final LeaderChangeContext ctx) {
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.STOP_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
});
}
/**
* Closure runs with an error.
*/
public class OnErrorClosure implements Closure {
private RaftException error;
public OnErrorClosure(final RaftException error) {
super();
this.error = error;
}
public RaftException getError() {
return this.error;
}
public void setError(final RaftException error) {
this.error = error;
}
@Override
public void run(final Status st) {
}
}
@Override
public boolean onError(final RaftException error) {
if (!this.error.getStatus().isOk()) {
LOG.warn("FSMCaller already in error status, ignore new error", error);
return false;
}
final OnErrorClosure c = new OnErrorClosure(error);
return enqueueTask((task, sequence) -> {
task.groupId = groupId;
task.type = TaskType.ERROR;
task.done = c;
});
}
@Override
public long getLastAppliedIndex() {
return this.lastAppliedIndex.get();
}
@Override
public synchronized void join() throws InterruptedException {
if (this.shutdownLatch != null) {
this.shutdownLatch.await();
this.disruptor.unsubscribe(groupId);
if (this.afterShutdown != null) {
this.afterShutdown.run(Status.OK());
this.afterShutdown = null;
}
this.shutdownLatch = null;
}
}
@SuppressWarnings("ConstantConditions")
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
if (task.type == TaskType.COMMITTED) {
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
}
else {
if (maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex);
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
final long startMs = Utils.monotonicMs();
try {
switch (task.type) {
case COMMITTED:
Requires.requireTrue(false, "Impossible");
break;
case SNAPSHOT_SAVE:
this.currTask = TaskType.SNAPSHOT_SAVE;
if (passByStatus(task.done)) {
doSnapshotSave((SaveSnapshotClosure) task.done);
}
break;
case SNAPSHOT_LOAD:
this.currTask = TaskType.SNAPSHOT_LOAD;
if (passByStatus(task.done)) {
doSnapshotLoad((LoadSnapshotClosure) task.done);
}
break;
case LEADER_STOP:
this.currTask = TaskType.LEADER_STOP;
doLeaderStop(task.status);
break;
case LEADER_START:
this.currTask = TaskType.LEADER_START;
doLeaderStart(task.term);
break;
case START_FOLLOWING:
this.currTask = TaskType.START_FOLLOWING;
doStartFollowing(task.leaderChangeCtx);
break;
case STOP_FOLLOWING:
this.currTask = TaskType.STOP_FOLLOWING;
doStopFollowing(task.leaderChangeCtx);
break;
case ERROR:
this.currTask = TaskType.ERROR;
doOnError((OnErrorClosure) task.done);
break;
case IDLE:
Requires.requireTrue(false, "Can't reach here");
break;
case SHUTDOWN:
this.currTask = TaskType.SHUTDOWN;
shutdown = task.shutdownLatch;
break;
case FLUSH:
this.currTask = TaskType.FLUSH;
shutdown = task.shutdownLatch;
break;
}
}
finally {
this.nodeMetrics.recordLatency(task.type.metricName(), Utils.monotonicMs() - startMs);
}
}
try {
if (endOfBatch && maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex);
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
this.currTask = TaskType.IDLE;
return maxCommittedIndex;
}
finally {
if (shutdown != null) {
shutdown.countDown();
}
}
}
private void doShutdown() {
if (this.node != null) {
this.node = null;
}
if (this.fsm != null) {
this.fsm.onShutdown();
}
}
private void notifyLastAppliedIndexUpdated(final long lastAppliedIndex) {
for (final LastAppliedLogIndexListener listener : this.lastAppliedLogIndexListeners) {
listener.onApplied(lastAppliedIndex);
}
}
private void doCommitted(final long committedIndex) {
if (!this.error.getStatus().isOk()) {
return;
}
final long lastAppliedIndex = this.lastAppliedIndex.get();
// We can tolerate the disorder of committed_index
if (lastAppliedIndex >= committedIndex) {
return;
}
final long startMs = Utils.monotonicMs();
try {
final List<Closure> closures = new ArrayList<>();
final List<TaskClosure> taskClosures = new ArrayList<>();
final long firstClosureIndex = this.closureQueue.popClosureUntil(committedIndex, closures, taskClosures);
// Calls TaskClosure#onCommitted if necessary
onTaskCommitted(taskClosures);
Requires.requireTrue(firstClosureIndex >= 0, "Invalid firstClosureIndex");
final IteratorImpl iterImpl = new IteratorImpl(this.fsm, this.logManager, closures, firstClosureIndex,
lastAppliedIndex, committedIndex, this.applyingIndex, this.node.getOptions());
while (iterImpl.isGood()) {
final LogEntry logEntry = iterImpl.entry();
if (logEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) {
if (logEntry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
if (logEntry.getOldPeers() != null && !logEntry.getOldPeers().isEmpty()) {
// Joint stage is not supposed to be noticeable by end users.
this.fsm.onConfigurationCommitted(new Configuration(iterImpl.entry().getPeers()));
}
}
if (iterImpl.done() != null) {
// For other entries, we have nothing to do besides flush the
// pending tasks and run this closure to notify the caller that the
// entries before this one were successfully committed and applied.
iterImpl.done().run(Status.OK());
}
iterImpl.next();
continue;
}
// Apply data task to user state machine
doApplyTasks(iterImpl);
}
if (iterImpl.hasError()) {
setError(iterImpl.getError());
iterImpl.runTheRestClosureWithError();
}
final long lastIndex = iterImpl.getIndex() - 1;
final long lastTerm = this.logManager.getTerm(lastIndex);
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
this.lastAppliedIndex.set(lastIndex);
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(lastIndex);
}
finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
}
}
private void onTaskCommitted(final List<TaskClosure> closures) {
for (int i = 0, size = closures.size(); i < size; i++) {
final TaskClosure done = closures.get(i);
done.onCommitted();
}
}
private void doApplyTasks(final IteratorImpl iterImpl) {
final IteratorWrapper iter = new IteratorWrapper(iterImpl);
final long startApplyMs = Utils.monotonicMs();
final long startIndex = iter.getIndex();
try {
this.fsm.onApply(iter);
}
finally {
this.nodeMetrics.recordLatency("fsm-apply-tasks", Utils.monotonicMs() - startApplyMs);
this.nodeMetrics.recordSize("fsm-apply-tasks-count", iter.getIndex() - startIndex);
}
if (iter.hasNext()) {
LOG.error("Iterator is still valid, did you return before iterator reached the end?");
}
// Try move to next in case that we pass the same log twice.
iter.next();
}
private void doSnapshotSave(final SaveSnapshotClosure done) {
Requires.requireNonNull(done, "SaveSnapshotClosure is null");
final long lastAppliedIndex = this.lastAppliedIndex.get();
final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
if (confEntry == null || confEntry.isEmpty()) {
LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), done, new Status(RaftError.EINVAL,
"Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex));
return;
}
SnapshotMetaBuilder metaBuilder = msgFactory.snapshotMeta()
.lastIncludedIndex(lastAppliedIndex)
.lastIncludedTerm(this.lastAppliedTerm)
.peersList(confEntry.getConf().getPeers().stream().map(Object::toString).collect(toList()))
.learnersList(confEntry.getConf().getLearners().stream().map(Object::toString).collect(toList()));
if (confEntry.getOldConf() != null) {
metaBuilder
.oldPeersList(confEntry.getOldConf().getPeers().stream().map(Object::toString).collect(toList()))
.oldLearnersList(confEntry.getOldConf().getLearners().stream().map(Object::toString).collect(toList()));
}
final SnapshotWriter writer = done.start(metaBuilder.build());
if (writer == null) {
done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
return;
}
this.fsm.onSnapshotSave(writer, done);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("StateMachine [");
switch (this.currTask) {
case IDLE:
sb.append("Idle");
break;
case COMMITTED:
sb.append("Applying logIndex=").append(this.applyingIndex);
break;
case SNAPSHOT_SAVE:
sb.append("Saving snapshot");
break;
case SNAPSHOT_LOAD:
sb.append("Loading snapshot");
break;
case ERROR:
sb.append("Notifying error");
break;
case LEADER_STOP:
sb.append("Notifying leader stop");
break;
case LEADER_START:
sb.append("Notifying leader start");
break;
case START_FOLLOWING:
sb.append("Notifying start following");
break;
case STOP_FOLLOWING:
sb.append("Notifying stop following");
break;
case SHUTDOWN:
sb.append("Shutting down");
break;
default:
break;
}
return sb.append(']').toString();
}
private void doSnapshotLoad(final LoadSnapshotClosure done) {
Requires.requireNonNull(done, "LoadSnapshotClosure is null");
final SnapshotReader reader = done.start();
if (reader == null) {
done.run(new Status(RaftError.EINVAL, "open SnapshotReader failed"));
return;
}
final RaftOutter.SnapshotMeta meta = reader.load();
if (meta == null) {
done.run(new Status(RaftError.EINVAL, "SnapshotReader load meta failed"));
if (reader.getRaftError() == RaftError.EIO) {
final RaftException err = new RaftException(ErrorType.ERROR_TYPE_SNAPSHOT, RaftError.EIO,
"Fail to load snapshot meta");
setError(err);
}
return;
}
final LogId lastAppliedId = new LogId(this.lastAppliedIndex.get(), this.lastAppliedTerm);
final LogId snapshotId = new LogId(meta.lastIncludedIndex(), meta.lastIncludedTerm());
if (lastAppliedId.compareTo(snapshotId) > 0) {
done.run(new Status(
RaftError.ESTALE,
"Loading a stale snapshot last_applied_index=%d last_applied_term=%d snapshot_index=%d snapshot_term=%d",
lastAppliedId.getIndex(), lastAppliedId.getTerm(), snapshotId.getIndex(), snapshotId.getTerm()));
return;
}
if (!this.fsm.onSnapshotLoad(reader)) {
done.run(new Status(-1, "StateMachine onSnapshotLoad failed"));
final RaftException e = new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE,
RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed");
setError(e);
return;
}
if (meta.oldPeersList() == null) {
// Joint stage is not supposed to be noticeable by end users.
final Configuration conf = new Configuration();
if (meta.peersList() != null) {
for (String metaPeer : meta.peersList()) {
final PeerId peer = new PeerId();
Requires.requireTrue(peer.parse(metaPeer), "Parse peer failed");
conf.addPeer(peer);
}
}
this.fsm.onConfigurationCommitted(conf);
}
this.lastAppliedIndex.set(meta.lastIncludedIndex());
this.lastAppliedTerm = meta.lastIncludedTerm();
done.run(Status.OK());
}
private void doOnError(final OnErrorClosure done) {
setError(done.getError());
}
private void doLeaderStop(final Status status) {
this.fsm.onLeaderStop(status);
}
private void doLeaderStart(final long term) {
this.fsm.onLeaderStart(term);
}
private void doStartFollowing(final LeaderChangeContext ctx) {
this.fsm.onStartFollowing(ctx);
}
private void doStopFollowing(final LeaderChangeContext ctx) {
this.fsm.onStopFollowing(ctx);
}
private void setError(final RaftException e) {
if (this.error.getType() != ErrorType.ERROR_TYPE_NONE) {
// already report
return;
}
this.error = e;
if (this.fsm != null) {
this.fsm.onError(e);
}
if (this.node != null) {
this.node.onError(e);
}
}
@OnlyForTest
RaftException getError() {
return this.error;
}
private boolean passByStatus(final Closure done) {
final Status status = this.error.getStatus();
if (!status.isOk()) {
if (done != null) {
done.run(new Status(RaftError.EINVAL, "FSMCaller is in bad status=`%s`", status));
return false;
}
}
return true;
}
@Override
public void describe(final Printer out) {
out.print(" ") //
.println(toString());
}
}