blob: 31e6f56e96fa1e120a3501e10fbc87bb354bd16c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.core;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
import org.apache.ignite.raft.jraft.ReadOnlyService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.disruptor.GroupAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.ReadIndexState;
import org.apache.ignite.raft.jraft.entity.ReadIndexStatus;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions;
import org.apache.ignite.raft.jraft.rpc.ReadIndexRequestBuilder;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Bytes;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.Utils;
/**
* Read-only service implementation.
*/
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {
private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
/** RAFT group id. */
private String groupId;
/**
* Disruptor to run readonly service.
*/
private StripedDisruptor<ReadIndexEvent> readIndexDisruptor;
private RingBuffer<ReadIndexEvent> readIndexQueue;
private RaftOptions raftOptions;
private NodeImpl node;
private final Lock lock = new ReentrantLock();
private FSMCaller fsmCaller;
private volatile CountDownLatch shutdownLatch;
private NodeMetrics nodeMetrics;
private volatile RaftException error;
// <logIndex, statusList>
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();
private static final IgniteLogger LOG = IgniteLogger.forClass(ReadOnlyServiceImpl.class);
public static class ReadIndexEvent implements GroupAware {
/** Raft group id. */
String groupId;
Bytes requestContext;
ReadIndexClosure done;
CountDownLatch shutdownLatch;
long startTime;
/** {@inheritDoc} */
@Override public String groupId() {
return groupId;
}
}
private static class ReadIndexEventFactory implements EventFactory<ReadIndexEvent> {
@Override
public ReadIndexEvent newInstance() {
return new ReadIndexEvent();
}
}
private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
// task list for batch
private final List<ReadIndexEvent> events = new ArrayList<>(
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());
@Override
public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(this.events);
this.events.clear();
newEvent.shutdownLatch.countDown();
return;
}
this.events.add(newEvent);
if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(this.events);
this.events.clear();
}
}
}
/**
* ReadIndexResponse process closure
*/
class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexResponse> {
final List<ReadIndexState> states;
final ReadIndexRequest request;
ReadIndexResponseClosure(final List<ReadIndexState> states, final ReadIndexRequest request) {
super();
this.states = states;
this.request = request;
}
/**
* Called when ReadIndex response returns.
*/
@Override
public void run(final Status status) {
if (!status.isOk()) {
notifyFail(status);
return;
}
final ReadIndexResponse readIndexResponse = getResponse();
if (!readIndexResponse.success()) {
notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
return;
}
// Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
readIndexResponse.index());
for (final ReadIndexState state : this.states) {
// Records current commit log index.
state.setIndex(readIndexResponse.index());
}
boolean doUnlock = true;
ReadOnlyServiceImpl.this.lock.lock();
try {
if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
// Already applied, notify readIndex request.
ReadOnlyServiceImpl.this.lock.unlock();
doUnlock = false;
notifySuccess(readIndexStatus);
}
else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
}
}
finally {
if (doUnlock) {
ReadOnlyServiceImpl.this.lock.unlock();
}
}
}
private void notifyFail(final Status status) {
final long nowMs = Utils.monotonicMs();
for (final ReadIndexState state : this.states) {
ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", nowMs - state.getStartTimeMs());
final ReadIndexClosure done = state.getDone();
if (done != null) {
final Bytes reqCtx = state.getRequestContext();
done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, reqCtx != null ? reqCtx.get() : null);
}
}
}
}
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty())
return;
ReadIndexRequestBuilder rb = raftOptions.getRaftMessagesFactory()
.readIndexRequest()
.groupId(this.node.getGroupId())
.serverId(this.node.getServerId().toString());
List<ReadIndexState> states = new ArrayList<>(events.size());
List<ByteString> entries = new ArrayList<>(events.size());
for (ReadIndexEvent event : events) {
byte[] ctx = event.requestContext.get();
entries.add(ctx == null ? ByteString.EMPTY : new ByteString(ctx));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}
ReadIndexRequest request = rb
.entriesList(entries)
.build();
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
private void resetPendingStatusError(final Status st) {
this.lock.lock();
try {
for (final List<ReadIndexStatus> statuses : this.pendingNotifyStatus.values()) {
for (final ReadIndexStatus status : statuses) {
reportError(status, st);
}
}
this.pendingNotifyStatus.clear();
}
finally {
this.lock.unlock();
}
}
@Override
public boolean init(final ReadOnlyServiceOptions opts) {
this.groupId = opts.getGroupId();
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.fsmCaller = opts.getFsmCaller();
this.raftOptions = opts.getRaftOptions();
readIndexDisruptor = opts.getReadOnlyServiceDisruptor();
readIndexQueue = readIndexDisruptor.subscribe(groupId, new ReadIndexEventHandler());
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry() //
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
}
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);
// start scanner.
this.node.getTimerManager().scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
}
@Override
public synchronized void setError(final RaftException error) {
if (this.error == null) {
this.error = error;
}
}
@Override
public synchronized void shutdown() {
if (this.shutdownLatch != null) {
return;
}
this.shutdownLatch = new CountDownLatch(1);
Utils.runInThread(this.node.getOptions().getCommonExecutor(),
() -> this.readIndexQueue.publishEvent((event, sequence) -> {
event.groupId = this.groupId;
event.shutdownLatch = this.shutdownLatch;
}));
}
@Override
public void join() throws InterruptedException {
if (this.shutdownLatch != null) {
this.shutdownLatch.await();
}
this.readIndexDisruptor.unsubscribe(groupId);
resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit."));
}
@Override
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.groupId = this.groupId;
event.done = closure;
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
};
int retryTimes = 0;
while (true) {
if (this.readIndexQueue.tryPublishEvent(translator)) {
break;
}
else {
retryTimes++;
if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure,
new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
return;
}
ThreadHelper.onSpinWait();
}
}
}
catch (final Exception e) {
Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure, new Status(RaftError.EPERM, "Node is down."));
}
}
/**
* Called when lastAppliedIndex updates.
*
* @param appliedIndex applied index
*/
@Override
public void onApplied(final long appliedIndex) {
// TODO reuse pendingStatuses list? https://issues.apache.org/jira/browse/IGNITE-14832
List<ReadIndexStatus> pendingStatuses = null;
this.lock.lock();
try {
if (this.pendingNotifyStatus.isEmpty()) {
return;
}
// Find all statuses that log index less than or equal to appliedIndex.
final Map<Long, List<ReadIndexStatus>> statuses = this.pendingNotifyStatus.headMap(appliedIndex, true);
if (statuses != null) {
pendingStatuses = new ArrayList<>(statuses.size() << 1);
final Iterator<Map.Entry<Long, List<ReadIndexStatus>>> it = statuses.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<Long, List<ReadIndexStatus>> entry = it.next();
pendingStatuses.addAll(entry.getValue());
// Remove the entry from statuses, it will also be removed in pendingNotifyStatus.
it.remove();
}
}
/*
* Remaining pending statuses are notified by error if it is presented.
* When the node is in error state, consider following situations:
* 1. If commitIndex > appliedIndex, then all pending statuses should be notified by error status.
* 2. When commitIndex == appliedIndex, there will be no more pending statuses.
*/
if (this.error != null) {
resetPendingStatusError(this.error.getStatus());
}
}
finally {
this.lock.unlock();
if (pendingStatuses != null && !pendingStatuses.isEmpty()) {
for (final ReadIndexStatus status : pendingStatuses) {
notifySuccess(status);
}
}
}
}
/**
* Flush all events in disruptor.
*/
@OnlyForTest
void flush() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
this.readIndexQueue.publishEvent((task, sequence) -> {
task.groupId = this.groupId;
task.shutdownLatch = latch;
});
latch.await();
}
@OnlyForTest
TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {
return this.pendingNotifyStatus;
}
private void reportError(final ReadIndexStatus status, final Status st) {
final long nowMs = Utils.monotonicMs();
final List<ReadIndexState> states = status.getStates();
final int taskCount = states.size();
for (int i = 0; i < taskCount; i++) {
final ReadIndexState task = states.get(i);
final ReadIndexClosure done = task.getDone();
if (done != null) {
this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
done.run(st);
}
}
}
private void notifySuccess(final ReadIndexStatus status) {
final long nowMs = Utils.monotonicMs();
final List<ReadIndexState> states = status.getStates();
final int taskCount = states.size();
for (int i = 0; i < taskCount; i++) {
final ReadIndexState task = states.get(i);
final ReadIndexClosure done = task.getDone(); // stack copy
if (done != null) {
this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
done.setResult(task.getIndex(), task.getRequestContext().get());
done.run(Status.OK());
}
}
}
}